Source code for examples.macro_model.spaces.market
# -*- coding: utf-8 -*-
""" Basic Market Class implementation """
from basicSpaces import Space
from sortedcontainers import SortedDict
[docs]class Market(Space):
""" Abstract Market """
BID_TYPE = ['O', 'D']
def __init__(self, model, name, actions_set_file, action_class):
""" Intialize abstract market """
super().__init__(model, name, actions_set_file, action_class)
self.offers = SortedDict()
self.demmand = SortedDict()
self.contracts = {}
[docs] def update(self):
""" Implemented by subclass - Testing update """
self.match_bids()
[docs] def match_bids(self):
"""
Market matching of offer and demmand
This method can be specialized depending on market
Is prepared to be an assincronous method
"""
self.bids_not_matched = True
self.contracted_offers = {}
self.total_contracted_value = 0.0
while self.bids_not_matched:
if self.has_demmand():
self.a_demmand = self.get_highest_demmand()
self.demmand_not_satisfied = True
self.demmand_owner = self.a_demmand.owner_of_g
self.total_contracted_value = 0.0
while self.demmand_not_satisfied:
if self.has_offers():
self.an_offer = self.get_lowest_offer()
self.an_offer.owner_of_g = self.demmand_owner
self.contracted_offers[self.an_offer.producer_of_g] = self.an_offer
self.total_contracted_value += self.an_offer.value_of_g
if self.total_contracted_value >= self.an_offer.value_of_g:
self.demmand_not_satisfied = False
else:
self.demmand_not_satisfied = False
self.release_demmand()
self.demmand.clear()
self.notify_match(self.a_demmand, self.contracted_offers)
self.register_contract(self.a_demmand, self.contracted_offers)
else:
self.bids_not_matched = False
self.release_offers()
self.offers.clear()
[docs] def bid_market(self, bid_type, a_good):
""" include an offer in the market """
if bid_type in self.BID_TYPE:
if bid_type == 'O':
self.offers[a_good.value_of_g] = a_good
else:
if bid_type == 'D':
self.demmand[a_good.value_of_g] = a_good
else:
raise Exception("Type of bid not valid - type: ", bid_type)
[docs] def notify_match(self, a_demmand, contracted_offers):
""" Notify the agents that their bids where matched """
self.contractor = a_demmand.owner_of_g
self.contractor.get_contracted_offers(contracted_offers)
for offer in contracted_offers.values():
offer.producer_of_g.got_contract()
[docs] def register_contract(self, a_demmand, contracted_offers):
""" The matched bids become contracts """
self.contractor = a_demmand.owner_of_g
self.contracts[self.contractor] = contracted_offers
self.contractor.get_contracted_offers(contracted_offers)
[docs] def release_offers(self):
""" When some bid passed the timeout of the system, the market release the bid """
for bid, an_offer in self.offers.items():
an_offer.producer_of_g.release_offer()
[docs] def release_demmand(self):
""" When some bid passed the timeout of the system, the market release the bid """
for bid, a_demmand in self.demmand.items():
a_demmand.producer_of_g.release_demmand()
[docs] def init_offers(self, new_offers):
""" Get new offers dict and check if is a sorted dict - if yes, set it """
if isinstance(new_offers, SortedDict):
self.offers = new_offers
else:
print('The input is not a sorted dictionary')
[docs] def offer_gs(self, offered_good):
""" include an offer in the market """
self.offers[offered_good.value_of_g] = offered_good
[docs] def has_offers(self):
""" A market answers if is has offers (True or False) """
if self.offers.__len__() > 0:
return True
else:
return False
[docs] def has_demmand(self):
""" A market answers if is has demmand (True or False) """
if self.demmand.__len__() > 0:
return True
else:
return False
[docs] def no_of_offers(self):
""" A market answers the number of offers it has """
return self.offers.__len__()
[docs] def get_lowest_offer(self):
""" A market answers the offer with the lowest value (in the self.offers ordered dict) """
self.key, self.value = self.offers.popitem(0)
return self.value
[docs] def get_highest_offer(self):
""" A market answers the offer with the highest value (in the self.offers ordered dict) """
self.key, self.value = self.offers.popitem(-1)
return self.value
[docs] def get_lowest_demmand(self):
""" A market answers the demmand with the lowest value (in the self.demmand ordered dict) """
self.key, self.value = self.demmand.popitem(0)
return self.value
[docs] def get_highest_demmand(self):
""" A market answers the demmand with the highest value (in the self.demmand ordered dict) """
self.key, self.value = self.demmand.popitem(-1)
return self.value
[docs] def return_offer(self, gs):
""" A market receives an offer of a good or service """
self.offer_gs(gs)
[docs] def show_offer(self, an_agent):
""" Show offer """
an_agent.show_offer()
print("")
print("Model: ", an_agent.model.name)
print("------------")
[docs] def compute_production_price(self):
""" Agent compute production price """
pass
[docs] def pay_taxes(self, an_agent):
""" An agent pay taxes """
pass