Source code for kernel.basicSchedule
# -*- coding: utf-8 -*-
"""
Definition of the class Schedule
"""
from tqdm import trange
[docs]class Schedule(object):
""" Schedule Class """
def __init__(self, name, model):
""" Initialize the schedule"""
self.name = name
self.model = model
self.scenario_name = " "
[docs] def execute(self, scenario_name,
step_unit,
step_interval,
no_of_steps,
run_nr):
""" Interface for executing methods of schedule objects
Implemented by subclass
"""
raise NotImplementedError("Must subclass me")
# TODO: Definir a EventSchedule
# TODO: Eventualmente, implementar o Agente para diferentes tipos de schd
# TODO: Montar um modelo macroeconômico básico (Usando Hilder)
[docs]class EventSchedule(Schedule):
""" An Event Schedule """
def __init__(self, name, model):
""" PoolSchedule initialization """
super().__init__(name, model)
self.events = dict()
[docs] def collect_event(self, an_event):
self.events[an_event.event_id] = an_event
[docs] def execute(self, scenario_name,
step_unit,
step_interval,
no_of_steps,
run_nr):
pass
[docs]class PoolSchedule(Schedule):
""" A pool schedule """
def __init__(self, name, model):
""" PoolSchedule initialization """
super().__init__(name, model)
self.run_nr = " "
self.step = 0
[docs] def execute(self, scenario_name,
step_unit,
step_interval,
no_of_steps,
run_nr):
""" Executes the Pool Schedule """
self.run_nr = run_nr
self.scenario_name = scenario_name
self.status = "Scenario: " + self.scenario_name + " Run nr.: " + str(self.run_nr)
if step_unit == 'step':
for this_step in trange(0, no_of_steps, step_interval, desc = self.status):
self.step = this_step
for agent_name, agent in self.model.agents.items():
agent.dev_step(self.step)
for space_name, space in self.model.spaces.items():
space.update()
for observer_name, observer in self.model.agent_observers.items():
observer.observe(self.step)
else:
raise Exception(step_unit,
"is not valid as step unity")
[docs]class MixedSchedule(Schedule):
""" A mixed schedule for random execution"""
def __init__(self, name, model):
""" Mixed schedule initialization """
super().__init__(name, model)
self.run_nr = " "
[docs] def execute(self, scenario_name,
step_unit,
step_interval,
no_of_steps,
run_nr):
""" Executes the Mixed Schedule """
self.run_nr = run_nr
self.scenario_name = scenario_name
self.status = "Scenario: " + self.scenario_name + " Run nr.: " + str(self.run_nr)
if step_unit == 'step':
for this_step in trange(0, no_of_steps, step_interval, desc=self.status):
self.step = this_step
for agent in self.model.mixed_agents():
agent.dev_step(this_step)
for space in self.model.mixed_spaces():
space.update()
for observer_name, observer in self.model.agent_observers.items():
observer.observe(this_step)
else:
raise Exception(step_unit,
"is not valid as step unity")