Getting Started With OpenAI Gym: Creating Custom Gym Environments

This post covers how to implement a custom environment in OpenAI Gym. As an example, we implement a custom environment that involves flying a Chopper (or a helicopter) while avoiding obstacles mid-air.

8 months ago   •   13 min read

By Ayoosh Kathuria

OpenAI Gym comes packed with a lot of awesome environments, ranging from environments featuring classic control tasks to ones that let you train your agents to play Atari games like Breakout, Pacman, and Seaquest. However, you may still have a task at hand that necessitates the creation of a custom environment that is not a part of the Gym package. Thankfully, Gym is flexible enough to allow you to do so and that's precisely the topic of this post.

In this post, we will be designing a custom environment that will involve flying a Chopper (or a helicopter) while avoiding obstacles mid-air. Note that this is the second part of the Open AI Gym series, and knowledge of the concepts introduced in Part 1 is assumed as a prerequisite for this post. So if you haven't read Part 1, here is the link.

You can also run all of the code in this tutorial on a free GPU with a Gradient Community Notebook.

Bring this project to life

Dependencies/Imports

We first begin with installing some important dependencies.

!pip install opencv-python 
!pip install pillow

We also start with the necessary imports.  

import numpy as np 
import cv2 
import matplotlib.pyplot as plt
import PIL.Image as Image
import gym
import random

from gym import Env, spaces
import time

font = cv2.FONT_HERSHEY_COMPLEX_SMALL 

Description of the Environment

The environment that we are creating is basically a game that is heavily inspired by the Dino Run game, the one which you play in Google Chrome if you are disconnected from the Internet. There is a dinosaur, and you have to jump over cacti and avoid hitting birds. The distance you cover is representative of the reward you end up getting.

In our game, instead of a dinosaur, our agent is going to be a Chopper pilot.

  1. The chopper has to cover as much distance as possible to get the maximum reward. There will be birds that the chopper has to avoid.
  2. The episode terminates in case of a bird strike. The episode can also terminate if the Chopper runs out of fuel.
  3. Just like birds, there are floating fuel tanks (yes, no points for being close to reality, I know!) which the Chopper can collect to refuel the chopper to its full capacity (which is fixed at 1000 L).

Note that this is going to be just a proof of concept and not the most aesthetically-pleasing game. However, in case you want to improve on it, this post will leave you with enough knowledge to do so!

The very first consideration while designing an environment is to decide what sort of observation space and action space we will be using.

  • The observation space can be either continuous or discrete. An example of a discrete action space is that of a grid-world where the observation space is defined by cells, and the agent could be inside one of those cells. An example of a continuous action space is one where the position of the agent is described by real-valued coordinates.
  • The action space can be either continuous or discrete as well. An example of a discrete space is one where each action corresponds to the particular behavior of the agent, but that behavior cannot be quantified. An example of this is Mario Bros, where each action would lead to moving left, right, jumping, etc. Your actions can't quantify the behavior being produced, i.e. you can jump but not jump high, higher, or lower. However, in a game like Angry Birds, you decide how much to stretch the slingshot (you quantify it).

ChopperScape Class

We begin my implementing the __init__ function of our environment class, ChopperScape. In the __init__ function, we will define the observation and the action spaces. In addition to that, we will also implement a few other attributes:

  1. canvas: This represents our observation image.
  2. x_min, y_min, x_max, y_max: This defines the legitimate area of our screen where various elements of the screen, such as the Chopper and birds, can be placed. Other areas are reserved for displaying info such as fuel left, rewards, and padding.
  3. elements: This stores the active elements stored in the screen at any given time (like chopper, bird, etc.)
  4. max_fuel: Maximum fuel that the chopper can hold.
class ChopperScape(Env):
    def __init__(self):
        super(ChopperScape, self).__init__()
        

        
        # Define a 2-D observation space
        self.observation_shape = (600, 800, 3)
        self.observation_space = spaces.Box(low = np.zeros(self.observation_shape), 
                                            high = np.ones(self.observation_shape),
                                            dtype = np.float16)
    
        
        # Define an action space ranging from 0 to 4
        self.action_space = spaces.Discrete(6,)
                        
        # Create a canvas to render the environment images upon 
        self.canvas = np.ones(self.observation_shape) * 1
        
        # Define elements present inside the environment
        self.elements = []
        
        # Maximum fuel chopper can take at once
        self.max_fuel = 1000

        # Permissible area of helicper to be 
        self.y_min = int (self.observation_shape[0] * 0.1)
        self.x_min = 0
        self.y_max = int (self.observation_shape[0] * 0.9)
        self.x_max = self.observation_shape[1]
        

Elements of the Environment

Once we have determined the action space and the observation space, we need to finalize what would be the elements of our environment. In our game, we have three distinct elements: the Chopper, Flying Birds, and and Floating Fuel Stations. We will be implementing all of these as separate classes that inherit from a common base class called Point.

Point Base Class

The Point class is used to define any arbitrary point on our observation image. We define this class with the following attributes:

  • (x,y): Position of the point on the image.
  • (x_min, x_max, y_min, y_max): Permissible coordinates for the point. If we try to set the position of the point outside these limits, the position values are clamped to these limits.
  • name: Name of the point.

We define the following member functions for this class.

  • get_position: Get the coordinates of the point.
  • set_position: Set the coordinates of the point to a certain value.
  • move: Move the points by certain value.
class Point(object):
    def __init__(self, name, x_max, x_min, y_max, y_min):
        self.x = 0
        self.y = 0
        self.x_min = x_min
        self.x_max = x_max
        self.y_min = y_min
        self.y_max = y_max
        self.name = name
    
    def set_position(self, x, y):
        self.x = self.clamp(x, self.x_min, self.x_max - self.icon_w)
        self.y = self.clamp(y, self.y_min, self.y_max - self.icon_h)
    
    def get_position(self):
        return (self.x, self.y)
    
    def move(self, del_x, del_y):
        self.x += del_x
        self.y += del_y
        
        self.x = self.clamp(self.x, self.x_min, self.x_max - self.icon_w)
        self.y = self.clamp(self.y, self.y_min, self.y_max - self.icon_h)

    def clamp(self, n, minn, maxn):
        return max(min(maxn, n), minn)

Now we define the classes Chopper, Bird and Fuel. These classes are derived from the Point class, and introduce a set of new attributes:

  • icon: Icon of the point that will display on the  observation image when the game is rendered.
  • (icon_w, icon_h): Dimensions of the icon.

If you are viewing the Gradient notebook, the images used for icons are hosted along with the notebook.

class Chopper(Point):
    def __init__(self, name, x_max, x_min, y_max, y_min):
        super(Chopper, self).__init__(name, x_max, x_min, y_max, y_min)
        self.icon = cv2.imread("chopper.png") / 255.0
        self.icon_w = 64
        self.icon_h = 64
        self.icon = cv2.resize(self.icon, (self.icon_h, self.icon_w))

    
class Bird(Point):
    def __init__(self, name, x_max, x_min, y_max, y_min):
        super(Bird, self).__init__(name, x_max, x_min, y_max, y_min)
        self.icon = cv2.imread("bird.png") / 255.0
        self.icon_w = 32
        self.icon_h = 32
        self.icon = cv2.resize(self.icon, (self.icon_h, self.icon_w))
    
class Fuel(Point):
    def __init__(self, name, x_max, x_min, y_max, y_min):
        super(Fuel, self).__init__(name, x_max, x_min, y_max, y_min)
        self.icon = cv2.imread("fuel.png") / 255.0
        self.icon_w = 32
        self.icon_h = 32
        self.icon = cv2.resize(self.icon, (self.icon_h, self.icon_w))
    

Back to the ChopperScape Class

Recall from Part 1 that any gym Env class has two important functions:

  1. reset: Resets the environment to its initial state and returns the initial observation.
  2. step : Executes a step in the environment by applying an action. Returns the new observation, reward, completion status, and other info.

In this section, we will be implementing the reset and step functions of our environment along with many other helper functions. We begin with reset function.

Reset Function

When we reset our environment, we need to reset all the state-based variables in our environment. These include things like fuel consumed, episodic return, and the elements present inside the environment.

In our case, when we reset our environment, we have nothing but the Chopper in the initial state. We initialize our chopper randomly in an area in the top-left of our image. This area is 5-10 percent of the image width and 15-20 percent of the image height.

We also define a helper function called draw_elements_on_canvas that basically places the icons of each of the elements present in the game at their respective positions in the observation image. If the position is beyond the permissible range, then the icons are placed on the range boundaries. We also print important information such as the remaining fuel.

We finally return the canvas on which the elements have been placed as the observation.

%%add_to ChopperScape

def draw_elements_on_canvas(self):
    # Init the canvas 
    self.canvas = np.ones(self.observation_shape) * 1

    # Draw the heliopter on canvas
    for elem in self.elements:
        elem_shape = elem.icon.shape
        x,y = elem.x, elem.y
        self.canvas[y : y + elem_shape[1], x:x + elem_shape[0]] = elem.icon

    text = 'Fuel Left: {} | Rewards: {}'.format(self.fuel_left, self.ep_return)

    # Put the info on canvas 
    self.canvas = cv2.putText(self.canvas, text, (10,20), font,  
               0.8, (0,0,0), 1, cv2.LINE_AA)

def reset(self):
    # Reset the fuel consumed
    self.fuel_left = self.max_fuel

    # Reset the reward
    self.ep_return  = 0

    # Number of birds
    self.bird_count = 0
    self.fuel_count = 0

    # Determine a place to intialise the chopper in
    x = random.randrange(int(self.observation_shape[0] * 0.05), int(self.observation_shape[0] * 0.10))
    y = random.randrange(int(self.observation_shape[1] * 0.15), int(self.observation_shape[1] * 0.20))
    
    # Intialise the chopper
    self.chopper = Chopper("chopper", self.x_max, self.x_min, self.y_max, self.y_min)
    self.chopper.set_position(x,y)

    # Intialise the elements 
    self.elements = [self.chopper]

    # Reset the Canvas 
    self.canvas = np.ones(self.observation_shape) * 1

    # Draw elements on the canvas
    self.draw_elements_on_canvas()


    # return the observation
    return self.canvas 

Before we proceed further, let us now see what our initial observation looks like.

env = ChopperScape()
obs = env.reset()
plt.imshow(obs)

Since our observation is the same as the gameplay screen of the game, our render function shall return our observation too. We build functionality for two modes, one human which would render the game in a pop-up window, while rgb_array returns it as a pixel array.

%%add_to ChopperScape

def render(self, mode = "human"):
    assert mode in ["human", "rgb_array"], "Invalid mode, must be either \"human\" or \"rgb_array\""
    if mode == "human":
        cv2.imshow("Game", self.canvas)
        cv2.waitKey(10)
    
    elif mode == "rgb_array":
        return self.canvas
    
def close(self):
    cv2.destroyAllWindows()

env = ChopperScape()
obs = env.reset()
screen = env.render(mode = "rgb_array")
plt.imshow(screen)

Step Function

Now that we have the reset function out of the way, we begin work on implementing the step function, which will contain the code to transition our environment from one state to the next given an action. In many ways, this section is the proverbial meat of our environment, and this is where most of the planning goes.

We first need to enlist things that need to happen in one transition step of the environment. This can be basically broken down into two parts:

  1. Applying actions to our agent.
  2. Everything else that happens in the environments, such as behaviour of the non-RL actors (e.g. birds and floating gas stations).

So let's first focus on (1). We provide actions to the game that will control what our chopper does. We basically have 5 actions, which are move right, left, down, up, or do nothing, denoted by 0, 1, 2, 3, and 4, respectively.  

We define a member function called get_action_meanings() that will tell us what integer each action is mapped to for our reference.

%%add_to ChopperScape

def get_action_meanings(self):
    return {0: "Right", 1: "Left", 2: "Down", 3: "Up", 4: "Do Nothing"}

We also validate whether the action being passed is a valid action or not by checking whether it's present in the action space. If not, we raise an assertion.

# Assert that it is a valid action 
assert self.action_space.contains(action), "Invalid Action"

Once that is done, we accordingly change the position of the chopper using the move function we defined earlier. Each action results in movement by 5 coordinates in the respective directions.

# apply the action to the chopper
if action == 0:
    self.chopper.move(0,5)
elif action == 1:
    self.chopper.move(0,-5)
elif action == 2:
    self.chopper.move(5,0)
elif action == 3:
    self.chopper.move(-5,0)
elif action == 4:
    self.chopper.move(0,0)

Now that we have taken care of applying the action to the chopper, we focus on the other elements of the environment:

  • Birds spawn randomly from the right edge of the screen with a probability of 1% (i.e. a bird is likely to appear on the right edge once every hundred frames). The bird moves 5 coordinate points every frame to the left. If they hit the Chopper the game ends. Otherwise, they disappear from the game once they reach the left edge.
  • Fuel tanks spawn randomly from the bottom edge of the screen with a probability of 1 % (i.e. a fuel tank is likely to appear on the bottom edge once every hundred frames). The bird moves 5 co-ordinates up every frame. If they hit the Chopper, the Chopper is fuelled to its full capacity. Otherwise, they disappear from the game once they reach the top edge.

In order to implement the features outlined above, we need to implement a helper function that helps us determine whether two Point objects (such as a Chopper/Bird, Chopper/Fuel Tank) have collided or not. How do we define a collision? We say that two points have collided when the distance between the coordinates of their centers is less than half of the sum of their dimensions. We call this function has_collided.

%%add_to ChopperScape

def has_collided(self, elem1, elem2):
    x_col = False
    y_col = False

    elem1_x, elem1_y = elem1.get_position()
    elem2_x, elem2_y = elem2.get_position()

    if 2 * abs(elem1_x - elem2_x) <= (elem1.icon_w + elem2.icon_w):
        x_col = True

    if 2 * abs(elem1_y - elem2_y) <= (elem1.icon_h + elem2.icon_h):
        y_col = True

    if x_col and y_col:
        return True

    return False

Apart from this, we have to do some book-keeping. The reward for each step is 1, therefore, the episodic return counter is updated by 1 every episode. If there is a collision, the reward is -10 and the episode terminates. The fuel counter is reduced by 1 at every step.

Finally, we implement our step function. I've wrote extensive comments to guide you through it.

%%add_to ChopperScape

def step(self, action):
    # Flag that marks the termination of an episode
    done = False
    
    # Assert that it is a valid action 
    assert self.action_space.contains(action), "Invalid Action"

    # Decrease the fuel counter 
    self.fuel_left -= 1 
    
    # Reward for executing a step.
    reward = 1      

    # apply the action to the chopper
    if action == 0:
        self.chopper.move(0,5)
    elif action == 1:
        self.chopper.move(0,-5)
    elif action == 2:
        self.chopper.move(5,0)
    elif action == 3:
        self.chopper.move(-5,0)
    elif action == 4:
        self.chopper.move(0,0)

    # Spawn a bird at the right edge with prob 0.01
    if random.random() < 0.01:
        
        # Spawn a bird
        spawned_bird = Bird("bird_{}".format(self.bird_count), self.x_max, self.x_min, self.y_max, self.y_min)
        self.bird_count += 1

        # Compute the x,y co-ordinates of the position from where the bird has to be spawned
        # Horizontally, the position is on the right edge and vertically, the height is randomly 
        # sampled from the set of permissible values
        bird_x = self.x_max 
        bird_y = random.randrange(self.y_min, self.y_max)
        spawned_bird.set_position(self.x_max, bird_y)
        
        # Append the spawned bird to the elements currently present in Env. 
        self.elements.append(spawned_bird)    

    # Spawn a fuel at the bottom edge with prob 0.01
    if random.random() < 0.01:
        # Spawn a fuel tank
        spawned_fuel = Fuel("fuel_{}".format(self.bird_count), self.x_max, self.x_min, self.y_max, self.y_min)
        self.fuel_count += 1
        
        # Compute the x,y co-ordinates of the position from where the fuel tank has to be spawned
        # Horizontally, the position is randomly chosen from the list of permissible values and 
        # vertically, the position is on the bottom edge
        fuel_x = random.randrange(self.x_min, self.x_max)
        fuel_y = self.y_max
        spawned_fuel.set_position(fuel_x, fuel_y)
        
        # Append the spawned fuel tank to the elemetns currently present in the Env.
        self.elements.append(spawned_fuel)   

    # For elements in the Ev
    for elem in self.elements:
        if isinstance(elem, Bird):
            # If the bird has reached the left edge, remove it from the Env
            if elem.get_position()[0] <= self.x_min:
                self.elements.remove(elem)
            else:
                # Move the bird left by 5 pts.
                elem.move(-5,0)
            
            # If the bird has collided.
            if self.has_collided(self.chopper, elem):
                # Conclude the episode and remove the chopper from the Env.
                done = True
                reward = -10
                self.elements.remove(self.chopper)

        if isinstance(elem, Fuel):
            # If the fuel tank has reached the top, remove it from the Env
            if elem.get_position()[1] <= self.y_min:
                self.elements.remove(elem)
            else:
                # Move the Tank up by 5 pts.
                elem.move(0, -5)
                
            # If the fuel tank has collided with the chopper.
            if self.has_collided(self.chopper, elem):
                # Remove the fuel tank from the env.
                self.elements.remove(elem)
                
                # Fill the fuel tank of the chopper to full.
                self.fuel_left = self.max_fuel
    
    # Increment the episodic return
    self.ep_return += 1

    # Draw elements on the canvas
    self.draw_elements_on_canvas()

    # If out of fuel, end the episode.
    if self.fuel_left == 0:
        done = True

    return self.canvas, reward, done, []

Seeing It in Action

This concludes the code for our environment. Now execute some steps in the environment using an agent that takes random actions!

from IPython import display

env = ChopperScape()
obs = env.reset()


while True:
    # Take a random action
    action = env.action_space.sample()
    obs, reward, done, info = env.step(action)
    
    # Render the game
	env.render()
    
    if done == True:
        break

env.close()
Rendering the Environment

Conclusion

That's it for this part, folks. I hope this tutorial gave you some insight into some of the considerations and design decisions that go into designing a custom OpenAI environment. You can now try creating an environment of your choice, or if you're so inclined, you can make several improvements to the one we just designed for practice. Some suggestions right off the bat are:

  1. Instead of the episode terminating upon the first bird strike, you can implement multiple lives for the chopper.
  2. Design an evil alien race of mutated birds that are also able to fire missiles at the chopper, and the chopper has to avoid them.
  3. Do something about when a fuel tank and a bird collides!

With these suggestions, it's a wrap. Happy coding!

Add speed and simplicity to your Machine Learning workflow today

Get startedContact Sales

Spread the word

Keep reading