[文档]classDummyVecMultiAgentEnv(VecEnv):""" VecEnv that does runs multiple environments sequentially, that is, the step and reset commands are send to one environment at a time. Useful when debugging and when num_env == 1 (in the latter case, avoids communication overhead) Parameters: env_fns – environment function. """def__init__(self,env_fns,env_seed):self.waiting=Falseself.closed=Falseself.envs=[fn(env_seed=env_seed+inx_env)forinx_env,fninenumerate(env_fns)]env=self.envs[0]VecEnv.__init__(self,len(env_fns),env.observation_space,env.action_space)self.env_info=env.env_infoself.groups_info=env.groups_infoself.agents=env.agentsself.num_agents=env.num_agentsself.state_space=env.state_space# Type: Boxself.buf_state=[np.zeros(space2shape(self.state_space))for_inrange(self.num_envs)]self.buf_obs=[{}for_inrange(self.num_envs)]self.buf_avail_actions=[{}for_inrange(self.num_envs)]self.buf_info=[{}for_inrange(self.num_envs)]self.actions=Noneself.max_episode_steps=env.max_episode_steps
[文档]defreset(self):"""Reset the vectorized environments."""foreinrange(self.num_envs):self.buf_obs[e],self.buf_info[e]=self.envs[e].reset()self.buf_state[e]=self.buf_info[e]['state']self.buf_avail_actions[e]=self.buf_info[e]['avail_actions']returnself.buf_obs.copy(),self.buf_info.copy()
[文档]defstep_async(self,actions):"""Sends asynchronous step commands to each subprocess with the specified actions."""ifself.waiting:raiseAlreadySteppingErrorlistify=Truetry:iflen(actions)==self.num_envs:listify=FalseexceptTypeError:passifnotlistify:self.actions=actionselse:assertself.num_envs==1,"actions {} is either not a list or has a wrong size - cannot match to {} environments".format(actions,self.num_envs)self.actions=[actions]self.waiting=True
[文档]defstep_wait(self):""" Waits for the completion of asynchronous step operations and updates internal buffers with the received results. """ifnotself.waiting:raiseNotSteppingErrorrew_dict=[{}for_inself.envs]terminated_dict=[{}for_inself.envs]truncated=[Falsefor_inself.envs]foreinrange(self.num_envs):action_n=self.actions[e]self.buf_obs[e],rew_dict[e],terminated_dict[e],truncated[e],self.buf_info[e]=self.envs[e].step(action_n)self.buf_avail_actions[e]=self.buf_info[e]['avail_actions']self.buf_state[e]=self.buf_info[e]['state']ifall(terminated_dict[e].values())ortruncated[e]:obs_reset_dict,info_reset=self.envs[e].reset()self.buf_info[e]["reset_obs"]=obs_reset_dictself.buf_info[e]["reset_avail_actions"]=info_reset['avail_actions']self.buf_info[e]["reset_state"]=info_reset['state']self.waiting=Falsereturnself.buf_obs.copy(),rew_dict,terminated_dict,truncated,self.buf_info.copy()
[文档]defclose_extras(self):"""Closes the communication with subprocesses and joins the subprocesses."""self.closed=Trueforenvinself.envs:try:env.close()except:pass
[文档]defstep_wait(self):""" Waits for the completion of asynchronous step operations and updates internal buffers with the received results. """ifnotself.waiting:raiseNotSteppingErrorrew_dict=[{}for_inself.envs]terminated_dict=[{}for_inself.envs]truncated=[Falsefor_inself.envs]foreinrange(self.num_envs):action_n=self.actions[e]self.buf_obs[e],rew_dict[e],terminated_dict[e],truncated[e],self.buf_info[e]=self.envs[e].step(action_n)self.buf_avail_actions[e]=self.buf_info[e]['avail_actions']self.buf_state[e]=self.buf_info[e]['state']ifall(terminated_dict[e].values())ortruncated[e]:obs_reset_dict,info_reset=self.envs[e].reset()self.buf_info[e]["reset_obs"]=obs_reset_dictself.buf_info[e]["reset_avail_actions"]=info_reset['avail_actions']self.buf_info[e]["reset_state"]=info_reset['state']self.battles_game[e]+=1ifself.buf_info[e]['battle_won']:self.battles_won[e]+=1self.dead_allies_count[e]+=self.buf_info[e]['dead_allies']self.dead_enemies_count[e]+=self.buf_info[e]['dead_enemies']self.waiting=Falsereturnself.buf_obs.copy(),rew_dict,terminated_dict,truncated,self.buf_info.copy()
[文档]defstep_wait(self):""" Waits for the completion of asynchronous step operations and updates internal buffers with the received results. """ifnotself.waiting:raiseNotSteppingErrorrew_dict=[{}for_inself.envs]terminated_dict=[{}for_inself.envs]truncated=[Falsefor_inself.envs]foreinrange(self.num_envs):action_n=self.actions[e]self.buf_obs[e],rew_dict[e],terminated_dict[e],truncated[e],self.buf_info[e]=self.envs[e].step(action_n)self.buf_avail_actions[e]=self.buf_info[e]['avail_actions']self.buf_state[e]=self.buf_info[e]['state']ifall(terminated_dict[e].values())ortruncated[e]:obs_reset_dict,info_reset=self.envs[e].reset()self.buf_info[e]["reset_obs"]=obs_reset_dictself.buf_info[e]["reset_avail_actions"]=info_reset['avail_actions']self.buf_info[e]["reset_state"]=info_reset['state']self.battles_game[e]+=1ifself.buf_info[e]['score_reward']>0:self.battles_won[e]+=1self.waiting=Falsereturnself.buf_obs.copy(),rew_dict,terminated_dict,truncated,self.buf_info.copy()