Source code for gym_donkeycar.envs.donkey_env
'''
file: donkey_env.py
author: Tawn Kramer
date: 2018-08-31
'''
import os
import random
import time
import numpy as np
import gym
from gym import spaces
from gym.utils import seeding
from gym_donkeycar.envs.donkey_sim import DonkeyUnitySimContoller
from gym_donkeycar.envs.donkey_proc import DonkeyUnityProcess
[docs]class DonkeyEnv(gym.Env):
"""
OpenAI Gym Environment for Donkey
"""
metadata = {
"render.modes": ["human", "rgb_array"],
}
ACTION_NAMES = ["steer", "throttle"]
STEER_LIMIT_LEFT = -1.0
STEER_LIMIT_RIGHT = 1.0
THROTTLE_MIN = 0.0
THROTTLE_MAX = 5.0
VAL_PER_PIXEL = 255
def __init__(self, level, time_step=0.05, frame_skip=2):
print("starting DonkeyGym env")
# start Unity simulation subprocess
self.proc = DonkeyUnityProcess()
try:
exe_path = os.environ['DONKEY_SIM_PATH']
except:
print("Missing DONKEY_SIM_PATH environment var. you must start sim manually")
exe_path = "self_start"
try:
port_offset = 0
# if more than one sim running on same machine set DONKEY_SIM_MULTI = 1
random_port = os.environ['DONKEY_SIM_MULTI'] == '1'
if random_port:
port_offset = random.randint(0, 1000)
except:
pass
try:
port = int(os.environ['DONKEY_SIM_PORT']) + port_offset
except:
port = 9091 + port_offset
print("Missing DONKEY_SIM_PORT environment var. Using default:", port)
try:
headless = os.environ['DONKEY_SIM_HEADLESS'] == '1'
except:
print("Missing DONKEY_SIM_HEADLESS environment var. Using defaults")
headless = False
self.proc.start(exe_path, headless=headless, port=port)
# start simulation com
self.viewer = DonkeyUnitySimContoller(
level=level, time_step=time_step, port=port)
# steering and throttle
self.action_space = spaces.Box(low=np.array([self.STEER_LIMIT_LEFT, self.THROTTLE_MIN]),
high=np.array([self.STEER_LIMIT_RIGHT, self.THROTTLE_MAX]), dtype=np.float32)
# camera sensor data
self.observation_space = spaces.Box(
0, self.VAL_PER_PIXEL, self.viewer.get_sensor_size(), dtype=np.uint8)
# simulation related variables.
self.seed()
# Frame Skipping
self.frame_skip = frame_skip
# wait until loaded
self.viewer.wait_until_loaded()
def __del__(self):
self.close()
[docs] def close(self):
self.proc.quit()
[docs] def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
[docs] def step(self, action):
for i in range(self.frame_skip):
self.viewer.take_action(action)
observation, reward, done, info = self.viewer.observe()
return observation, reward, done, info
[docs] def reset(self):
self.viewer.reset()
observation, reward, done, info = self.viewer.observe()
time.sleep(1)
return observation
[docs] def render(self, mode="human", close=False):
if close:
self.viewer.quit()
return self.viewer.render(mode)
[docs] def is_game_over(self):
return self.viewer.is_game_over()
## ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ##
[docs]class GeneratedRoadsEnv(DonkeyEnv):
def __init__(self):
super(GeneratedRoadsEnv, self).__init__(level=0)
[docs]class WarehouseEnv(DonkeyEnv):
def __init__(self):
super(WarehouseEnv, self).__init__(level=1)
[docs]class AvcSparkfunEnv(DonkeyEnv):
def __init__(self):
super(AvcSparkfunEnv, self).__init__(level=2)
[docs]class GeneratedTrackEnv(DonkeyEnv):
def __init__(self):
super(GeneratedTrackEnv, self).__init__(level=3)