Source code for rog_rl.env

import gym
from gym import spaces, wrappers
from gym.utils import seeding

from enum import Enum
import numpy as np


from rog_rl.agent_state import AgentState
from rog_rl.model import DiseaseSimModel
from rog_rl.vaccination_response import VaccinationResponse


[docs]class ActionType(Enum): STEP = 0 VACCINATE = 1
[docs]class RogSimEnv(gym.Env): def __init__(self, config={}): # Setup Config self.default_config = dict( width=50, height=50, population_density=0.75, vaccine_density=0.05, initial_infection_fraction=0.1, initial_vaccination_fraction=0.05, prob_infection=0.2, prob_agent_movement=0.0, disease_planner_config={ "latent_period_mu": 2 * 4, "latent_period_sigma": 0, "incubation_period_mu": 5 * 4, "incubation_period_sigma": 0, "recovery_period_mu": 14 * 4, "recovery_period_sigma": 0, }, max_simulation_timesteps=200, early_stopping_patience=14, use_renderer=False, # can be "human", "ansi" toric=True, dummy_simulation=False, debug=False) self.config = {} self.config.update(self.default_config) self.config.update(config) self.dummy_simulation = self.config["dummy_simulation"] self.debug = self.config["debug"] self.width = self.config["width"] self.height = self.config["height"] self.use_renderer = self.config["use_renderer"] self.action_space = spaces.MultiDiscrete( [ len(ActionType), self.width, self.height ]) self.observation_space = spaces.Box( low=np.float32(0), high=np.float32(1), shape=( self.width, self.height, len(AgentState))) self._model = None self.running_score = None self.np_random = np.random self.renderer = False if self.use_renderer: self.initialize_renderer(mode=self.use_renderer) self.cumulative_reward = 0
[docs] def set_renderer(self, renderer): self.use_renderer = renderer if self.use_renderer: self.initialize_renderer(mode=self.use_renderer)
[docs] def reset(self): # Delete Model if already exists if self._model: del self._model if self.dummy_simulation: """ In dummy simulation mode return a randomly sampled observation """ return self.observation_space.sample() width = self.config['width'] height = self.config['height'] population_density = self.config['population_density'] vaccine_density = self.config['vaccine_density'] initial_infection_fraction = self.config['initial_infection_fraction'] initial_vaccination_fraction = \ self.config['initial_vaccination_fraction'] prob_infection = self.config['prob_infection'] prob_agent_movement = self.config['prob_agent_movement'] disease_planner_config = self.config['disease_planner_config'] max_simulation_timesteps = self.config['max_simulation_timesteps'] early_stopping_patience = \ self.config['early_stopping_patience'] toric = self.config['toric'] """ Seeding Strategy : - The env maintains a custom seed/unsseded np.random instance accessible at self.np_random whenever env.seed() is called, the said np_random instance is seeded and during every new instantiation of a DiseaseEngine instance, it is seeded with a random number sampled from the self.np_random. """ _simulator_instance_seed = self.np_random.rand() # Instantiate Disease Model self._model = DiseaseSimModel( width, height, population_density, vaccine_density, initial_infection_fraction, initial_vaccination_fraction, prob_infection, prob_agent_movement, disease_planner_config, max_simulation_timesteps, early_stopping_patience, toric, seed=_simulator_instance_seed ) # Set the max timesteps of an env as the sum of : # - max_simulation_timesteps # - Number of Vaccines available self._max_episode_steps = self.config['max_simulation_timesteps'] + \ self._model.n_vaccines # Tick model self._model.tick() self.running_score = self.get_current_game_score() self.cumulative_reward = 0 # return observation return self._model.get_observation()
[docs] def initialize_renderer(self, mode="human"): if mode in ["human", "rgb_array"]: self.metadata = {'render.modes': ['human', 'rgb_array'], 'video.frames_per_second': 5} from rog_rl.renderer import Renderer self.renderer = Renderer( grid_size=(self.width, self.height) ) else: """ Initialize ANSI Renderer here """ self.metadata = {'render.modes': ['human', 'ansi'], 'video.frames_per_second': 5} from rog_rl.renderer import ANSIRenderer self.renderer = ANSIRenderer() self.renderer.setup(mode=mode)
[docs] def update_renderer(self, mode='human'): """ Updates the latest board state on the renderer """ # Draw Renderer # Update Renderer State model = self._model scheduler = model.get_scheduler() total_agents = scheduler.get_agent_count() state_metrics = self.get_current_game_metrics() initial_vaccines = int( model.initial_vaccination_fraction * model.n_agents) _vaccines_given = \ model.max_vaccines - model.n_vaccines - initial_vaccines _simulation_steps = int(scheduler.steps) # Game Steps includes steps in which each agent is vaccinated _game_steps = _simulation_steps + _vaccines_given self.renderer.update_stats( "SCORE", "{:.3f}".format(self.cumulative_reward)) self.renderer.update_stats("VACCINE_BUDGET", "{}".format( model.n_vaccines)) self.renderer.update_stats("SIMULATION_TICKS", "{}".format( _simulation_steps)) self.renderer.update_stats("GAME_TICKS", "{}".format(_game_steps)) for _state in AgentState: key = "population.{}".format(_state.name) stats = state_metrics[key] self.renderer.update_stats( key, "{} ({:.2f}%)".format( int(stats * total_agents), stats*100 ) ) if mode in ["human", "rgb_array"]: color = self.renderer.COLOR_MAP.get_color(_state) agents = scheduler.get_agents_by_state(_state) for _agent in agents: _agent_x, _agent_y = _agent.pos self.renderer.draw_cell( _agent_x, _agent_y, color ) if mode in ["human", "rgb_array"]: # Update the rest of the renderer self.renderer.pre_render() # Only in case of recording via Monitor or setting mode = rgb_array # we require the rgb image if isinstance(self, wrappers.Monitor): return_rgb_array = mode in ["human", "rgb_array"] else: return_rgb_array = mode == "rgb_array" render_output = self.renderer.post_render(return_rgb_array) return render_output elif mode == "ansi": render_output = self.renderer.render(self._model.grid) if self.debug: print(render_output) return render_output
[docs] def get_current_game_score(self): """ Returns the current game score The game score is currently represented as : (percentage of susceptibles left in the population) """ return self._model.get_population_fraction_by_state( AgentState.SUSCEPTIBLE )
[docs] def get_current_game_metrics(self, dummy_simulation=False): """ Returns a dictionary containing important game metrics """ _d = {} # current population fraction of different states for _state in AgentState: if not dummy_simulation: _value = self._model.get_population_fraction_by_state(_state) else: _value = self.np_random.rand() _key = "population.{}".format(_state.name) _d[_key] = _value # Add R0 to the game metrics _d["R0/10"] = self._model.contact_network.compute_R0()/10.0 return _d
[docs] def step(self, action): # Handle dummy_simulation Mode if self.dummy_simulation: return self.dummy_env_step() assert self.action_space.contains( action), "%r (%s) invalid" % (action, type(action)) if self._model is None: raise Exception("env.step() called before calling env.reset()") action = [int(x) for x in action] if self.debug: print("Action : ", action) # Handle action propagation in real simulator action_type = action[0] cell_x = action[1] cell_y = action[2] _observation = False _done = False _info = {} if action_type == ActionType.STEP.value: self._model.tick() _observation = self._model.get_observation() elif action_type == ActionType.VACCINATE.value: vaccination_success, response = \ self._model.vaccinate_cell(cell_x, cell_y) _observation = self._model.get_observation() # Force Run simulation to completion if # run out of vaccines if response == VaccinationResponse.AGENT_VACCINES_EXHAUSTED: while self._model.is_running(): self._model.tick() _observation = self._model.get_observation() # Compute difference in game score current_score = self.get_current_game_score() _step_reward = current_score - self.running_score self.cumulative_reward += _step_reward self.running_score = current_score # Add custom game metrics to info key game_metrics = self.get_current_game_metrics() for _key in game_metrics.keys(): _info[_key] = game_metrics[_key] _done = not self._model.is_running() return _observation, _step_reward, _done, _info
[docs] def dummy_env_step(self): """ Implements a fake env.step for faster Integration Testing with RL experiments framework """ observation = self.observation_space.sample() reward = self.np_random.rand() done = True if self.np_random.rand() < 0.01 else False info = {} game_metrics = self.get_current_game_metrics(dummy_simulation=True) info.update(game_metrics) return observation, reward, done, info
[docs] def seed(self, seed=None): self.np_random, seed = seeding.np_random(seed) return [seed]
[docs] def render(self, mode='human'): """ This methods provides the option to render the environment's behavior to a window which should be readable to the human eye if mode is set to 'human'. """ if not self.use_renderer: return if not self.renderer: self.initialize_renderer(mode=mode) return self.update_renderer(mode=mode)
[docs] def close(self): if self.renderer: self.renderer.close() self.renderer = False if self._model: # Delete the model instance if it exists self._model = None
if __name__ == "__main__": render = "ansi" # change to "human" env_config = dict( width=5, height=5, population_density=1.0, vaccine_density=1.0, initial_infection_fraction=0.04, initial_vaccination_fraction=0, prob_infection=0.2, prob_agent_movement=0.0, disease_planner_config={ "latent_period_mu": 2 * 4, "latent_period_sigma": 0, "incubation_period_mu": 5 * 4, "incubation_period_sigma": 0, "recovery_period_mu": 14 * 4, "recovery_period_sigma": 0, }, max_simulation_timesteps=200, early_stopping_patience=14, use_renderer=render, toric=False, dummy_simulation=False, debug=True) env = RogSimEnv(config=env_config) print("USE RENDERER ?", env.use_renderer) record = False if record: # records the the rendering in the `recording` folder env = wrappers.Monitor(env, "recording", force=True) observation = env.reset() done = False k = 0 env.render(mode=render) while not done: _action = input("Enter action - ex: [1, 4, 2] : ") if _action.strip() == "": _action = env.action_space.sample() else: _action = [int(x) for x in _action.split()] assert _action[0] in [0, 1] assert _action[1] in list(range(env._model.width)) assert _action[2] in list(range(env._model.height)) print("Action : ", _action) observation, reward, done, info = env.step(_action) env.render(mode=render) k += 1 # print(observation.shape) # print(k, reward, done) # print(observation.shape())