'''
file: donkey_sim.py
author: Tawn Kramer
date: 2018-08-31
'''
import time
from io import BytesIO
import math
import logging
from threading import Thread
import asyncore
import base64
import numpy as np
from PIL import Image
from gym_donkeycar.core.fps import FPSTimer
from gym_donkeycar.core.tcp_server import IMesgHandler, SimServer
from gym_donkeycar.envs.donkey_ex import SimFailed
logger = logging.getLogger(__name__)
[docs]class DonkeyUnitySimContoller():
def __init__(self, level, time_step=0.05, hostname='0.0.0.0',
port=9090, max_cte=5.0, loglevel='INFO', cam_resolution=(120, 160, 3)):
logger.setLevel(loglevel)
self.address = (hostname, port)
self.handler = DonkeyUnitySimHandler(
level, time_step=time_step, max_cte=max_cte,
cam_resolution=cam_resolution)
try:
self.server = SimServer(self.address, self.handler)
except OSError:
raise SimFailed("failed to listen on address %s" % self.address)
self.thread = Thread(target=asyncore.loop)
self.thread.daemon = True
self.thread.start()
[docs] def wait_until_loaded(self):
while not self.handler.loaded:
logger.warning("waiting for sim to start..")
time.sleep(3.0)
[docs] def reset(self):
self.handler.reset()
[docs] def get_sensor_size(self):
return self.handler.get_sensor_size()
[docs] def take_action(self, action):
self.handler.take_action(action)
[docs] def observe(self):
return self.handler.observe()
[docs] def render(self, mode):
pass
[docs] def is_game_over(self):
return self.handler.is_game_over()
[docs] def calc_reward(self, done):
return self.handler.calc_reward(done)
[docs]class DonkeyUnitySimHandler(IMesgHandler):
def __init__(self, level, time_step=0.05, max_cte=5.0, cam_resolution=None):
self.iSceneToLoad = level
self.time_step = time_step
self.wait_time_for_obs = 0.1
self.sock = None
self.loaded = False
self.max_cte = max_cte
self.timer = FPSTimer()
# sensor size - height, width, depth
self.camera_img_size = cam_resolution
self.image_array = np.zeros(self.camera_img_size)
self.last_obs = None
self.hit = "none"
self.cte = 0.0
self.x = 0.0
self.y = 0.0
self.z = 0.0
self.speed = 0.0
self.over = False
self.fns = {'telemetry': self.on_telemetry,
"scene_selection_ready": self.on_scene_selection_ready,
"scene_names": self.on_recv_scene_names,
"car_loaded": self.on_car_loaded}
[docs] def on_connect(self, socketHandler):
self.sock = socketHandler
[docs] def on_disconnect(self):
self.sock = None
[docs] def on_recv_message(self, message):
if 'msg_type' not in message:
logger.error('expected msg_type field')
return
msg_type = message['msg_type']
if msg_type in self.fns:
self.fns[msg_type](message)
else:
logger.warning(f'unknown message type {msg_type}')
## ------- Env interface ---------- ##
[docs] def reset(self):
logger.debug("reseting")
self.image_array = np.zeros(self.camera_img_size)
self.last_obs = self.image_array
self.hit = "none"
self.cte = 0.0
self.x = 0.0
self.y = 0.0
self.z = 0.0
self.speed = 0.0
self.over = False
self.send_reset_car()
self.timer.reset()
time.sleep(1)
[docs] def get_sensor_size(self):
return self.camera_img_size
[docs] def take_action(self, action):
self.send_control(action[0], action[1])
[docs] def observe(self):
while self.last_obs is self.image_array:
time.sleep(1.0 / 120.0)
self.last_obs = self.image_array
observation = self.image_array
done = self.is_game_over()
reward = self.calc_reward(done)
info = {'pos': (self.x, self.y, self.z), 'cte': self.cte,
"speed": self.speed, "hit": self.hit}
self.timer.on_frame()
return observation, reward, done, info
[docs] def is_game_over(self):
return self.over
## ------ RL interface ----------- ##
[docs] def calc_reward(self, done):
if done:
return -1.0
if self.cte > self.max_cte:
return -1.0
if self.hit != "none":
return -2.0
# going fast close to the center of lane yeilds best reward
return 1.0 - (self.cte / self.max_cte) * self.speed
## ------ Socket interface ----------- ##
[docs] def on_telemetry(self, data):
imgString = data["image"]
image = Image.open(BytesIO(base64.b64decode(imgString)))
# always update the image_array as the observation loop will hang if not changing.
self.image_array = np.asarray(image)
self.x = data["pos_x"]
self.y = data["pos_y"]
self.z = data["pos_z"]
self.speed = data["speed"]
# Cross track error not always present.
# Will be missing if path is not setup in the given scene.
# It should be setup in the 4 scenes available now.
if "cte" in data:
self.cte = data["cte"]
# don't update hit once session over
if self.over:
return
self.hit = data["hit"]
self.determine_episode_over()
[docs] def determine_episode_over(self):
# we have a few initial frames on start that are sometimes very large CTE when it's behind
# the path just slightly. We ignore those.
if math.fabs(self.cte) > 2 * self.max_cte:
pass
elif math.fabs(self.cte) > self.max_cte:
logger.debug(f"game over: cte {self.cte}")
self.over = True
elif self.hit != "none":
logger.debug(f"game over: hit {self.hit}")
self.over = True
[docs] def on_scene_selection_ready(self, data):
logger.debug("SceneSelectionReady ")
self.send_get_scene_names()
[docs] def on_car_loaded(self, data):
logger.debug("car loaded")
self.loaded = True
[docs] def on_recv_scene_names(self, data):
if data:
names = data['scene_names']
logger.debug(f"SceneNames: {names}")
self.send_load_scene(names[self.iSceneToLoad])
[docs] def send_control(self, steer, throttle):
if not self.loaded:
return
msg = {'msg_type': 'control', 'steering': steer.__str__(
), 'throttle': throttle.__str__(), 'brake': '0.0'}
self.queue_message(msg)
[docs] def send_reset_car(self):
msg = {'msg_type': 'reset_car'}
self.queue_message(msg)
[docs] def send_get_scene_names(self):
msg = {'msg_type': 'get_scene_names'}
self.queue_message(msg)
[docs] def send_load_scene(self, scene_name):
msg = {'msg_type': 'load_scene', 'scene_name': scene_name}
self.queue_message(msg)
[docs] def queue_message(self, msg):
if self.sock is None:
logger.debug(f'skiping: \n {msg}')
return
logger.debug(f'sending \n {msg}')
self.sock.queue_message(msg)