Source code for gym_donkeycar.envs.donkey_sim

'''
file: donkey_sim.py
author: Tawn Kramer
date: 2018-08-31
'''

import time
from io import BytesIO
import math
import logging
from threading import Thread
import asyncore
import base64

import numpy as np
from PIL import Image

from gym_donkeycar.core.fps import FPSTimer
from gym_donkeycar.core.tcp_server import IMesgHandler, SimServer
from gym_donkeycar.envs.donkey_ex import SimFailed

logger = logging.getLogger(__name__)


[docs]class DonkeyUnitySimContoller(): def __init__(self, level, time_step=0.05, hostname='0.0.0.0', port=9090, max_cte=5.0, loglevel='INFO', cam_resolution=(120, 160, 3)): logger.setLevel(loglevel) self.address = (hostname, port) self.handler = DonkeyUnitySimHandler( level, time_step=time_step, max_cte=max_cte, cam_resolution=cam_resolution) try: self.server = SimServer(self.address, self.handler) except OSError: raise SimFailed("failed to listen on address %s" % self.address) self.thread = Thread(target=asyncore.loop) self.thread.daemon = True self.thread.start()
[docs] def wait_until_loaded(self): while not self.handler.loaded: logger.warning("waiting for sim to start..") time.sleep(3.0)
[docs] def reset(self): self.handler.reset()
[docs] def get_sensor_size(self): return self.handler.get_sensor_size()
[docs] def take_action(self, action): self.handler.take_action(action)
[docs] def observe(self): return self.handler.observe()
[docs] def quit(self): pass
[docs] def render(self, mode): pass
[docs] def is_game_over(self): return self.handler.is_game_over()
[docs] def calc_reward(self, done): return self.handler.calc_reward(done)
[docs]class DonkeyUnitySimHandler(IMesgHandler): def __init__(self, level, time_step=0.05, max_cte=5.0, cam_resolution=None): self.iSceneToLoad = level self.time_step = time_step self.wait_time_for_obs = 0.1 self.sock = None self.loaded = False self.max_cte = max_cte self.timer = FPSTimer() # sensor size - height, width, depth self.camera_img_size = cam_resolution self.image_array = np.zeros(self.camera_img_size) self.last_obs = None self.hit = "none" self.cte = 0.0 self.x = 0.0 self.y = 0.0 self.z = 0.0 self.speed = 0.0 self.over = False self.fns = {'telemetry': self.on_telemetry, "scene_selection_ready": self.on_scene_selection_ready, "scene_names": self.on_recv_scene_names, "car_loaded": self.on_car_loaded}
[docs] def on_connect(self, socketHandler): self.sock = socketHandler
[docs] def on_disconnect(self): self.sock = None
[docs] def on_recv_message(self, message): if 'msg_type' not in message: logger.error('expected msg_type field') return msg_type = message['msg_type'] if msg_type in self.fns: self.fns[msg_type](message) else: logger.warning(f'unknown message type {msg_type}')
## ------- Env interface ---------- ##
[docs] def reset(self): logger.debug("reseting") self.image_array = np.zeros(self.camera_img_size) self.last_obs = self.image_array self.hit = "none" self.cte = 0.0 self.x = 0.0 self.y = 0.0 self.z = 0.0 self.speed = 0.0 self.over = False self.send_reset_car() self.timer.reset() time.sleep(1)
[docs] def get_sensor_size(self): return self.camera_img_size
[docs] def take_action(self, action): self.send_control(action[0], action[1])
[docs] def observe(self): while self.last_obs is self.image_array: time.sleep(1.0 / 120.0) self.last_obs = self.image_array observation = self.image_array done = self.is_game_over() reward = self.calc_reward(done) info = {'pos': (self.x, self.y, self.z), 'cte': self.cte, "speed": self.speed, "hit": self.hit} self.timer.on_frame() return observation, reward, done, info
[docs] def is_game_over(self): return self.over
## ------ RL interface ----------- ##
[docs] def calc_reward(self, done): if done: return -1.0 if self.cte > self.max_cte: return -1.0 if self.hit != "none": return -2.0 # going fast close to the center of lane yeilds best reward return 1.0 - (self.cte / self.max_cte) * self.speed
## ------ Socket interface ----------- ##
[docs] def on_telemetry(self, data): imgString = data["image"] image = Image.open(BytesIO(base64.b64decode(imgString))) # always update the image_array as the observation loop will hang if not changing. self.image_array = np.asarray(image) self.x = data["pos_x"] self.y = data["pos_y"] self.z = data["pos_z"] self.speed = data["speed"] # Cross track error not always present. # Will be missing if path is not setup in the given scene. # It should be setup in the 4 scenes available now. if "cte" in data: self.cte = data["cte"] # don't update hit once session over if self.over: return self.hit = data["hit"] self.determine_episode_over()
[docs] def determine_episode_over(self): # we have a few initial frames on start that are sometimes very large CTE when it's behind # the path just slightly. We ignore those. if math.fabs(self.cte) > 2 * self.max_cte: pass elif math.fabs(self.cte) > self.max_cte: logger.debug(f"game over: cte {self.cte}") self.over = True elif self.hit != "none": logger.debug(f"game over: hit {self.hit}") self.over = True
[docs] def on_scene_selection_ready(self, data): logger.debug("SceneSelectionReady ") self.send_get_scene_names()
[docs] def on_car_loaded(self, data): logger.debug("car loaded") self.loaded = True
[docs] def on_recv_scene_names(self, data): if data: names = data['scene_names'] logger.debug(f"SceneNames: {names}") self.send_load_scene(names[self.iSceneToLoad])
[docs] def send_control(self, steer, throttle): if not self.loaded: return msg = {'msg_type': 'control', 'steering': steer.__str__( ), 'throttle': throttle.__str__(), 'brake': '0.0'} self.queue_message(msg)
[docs] def send_reset_car(self): msg = {'msg_type': 'reset_car'} self.queue_message(msg)
[docs] def send_get_scene_names(self): msg = {'msg_type': 'get_scene_names'} self.queue_message(msg)
[docs] def send_load_scene(self, scene_name): msg = {'msg_type': 'load_scene', 'scene_name': scene_name} self.queue_message(msg)
[docs] def queue_message(self, msg): if self.sock is None: logger.debug(f'skiping: \n {msg}') return logger.debug(f'sending \n {msg}') self.sock.queue_message(msg)