[文档]classRoboticWarehouseEnv(RawMultiAgentEnv):""" Note: To make this environment successfully, the gym verison is suggested to be 0.21.0. """def__init__(self,config):super(RoboticWarehouseEnv,self).__init__()self.env=gym.make(config.env_id,render_mode=config.render_mode)self.num_agents=len(self.env.action_space)# the number of agentsself.agents=[f'agent_{i}'foriinrange(self.num_agents)]self.seed=config.env_seed# random seedself.env.reset(seed=self.seed)self.observation_space={k:self.env.observation_space[i]fori,kinenumerate(self.agents)}self.action_space={k:self.env.action_space[i]fori,kinenumerate(self.agents)}self.dim_state=sum([self.observation_space[k].shape[-1]forkinself.agents])self.state_space=Box(-np.inf,np.inf,shape=[self.dim_state,],dtype=np.float32)self.max_episode_steps=config.max_episode_stepsself._episode_step=0# initialize the current step
[文档]defclose(self):"""Close your environment here"""self.env.close()
[文档]defrender(self,render_mode):"""Render the environment, and return the images"""returnself.env.env.env.render(mode=render_mode)
[文档]defreset(self):"""Reset your environment, and return initialized observations and other information."""obs,info=self.env.reset()obs=np.array(obs)obs_dict={k:obs[i]fori,kinenumerate(self.agents)}info={}self._episode_step=0returnobs_dict,info
[文档]defstep(self,actions):"""Execute the actions and get next observations, rewards, and other information."""actions_list=[actions[k]forkinself.agents]observation,reward,terminated,truncated,info=self.env.step(actions_list)obs_dict={k:observation[i]fori,kinenumerate(self.agents)}reward_dict={k:reward[i]fori,kinenumerate(self.agents)}terminated_dict={k:terminatedforkinself.agents}self._episode_step+=1# initialize the current stepreturnobs_dict,reward_dict,terminated_dict,truncated,info
[文档]defstate(self):"""Get the global state of the environment in current step."""returnself.state_space.sample()