Multithreading on the Raspberry Pi is an excellent way to keep the main thread of your program running as efficiently as possible. To demonstrate, I show how you can move game controller polling routines to a separate thread. Find the Python example code for robot control at the end of this feature.
In the last article, featuring this controller, I include example code to show how you might interface with the device using python code. Unfortunately, while using the inputs Python package, the Python program will only run one program cycle between each controller event update. This code blocking event meant that the program would not perform other tasks while the controller is inactive. So to fix this code blocking, I am going to add multithreading to the new example code; which I include at the end of this feature.
Despite the code execution blocking of the Python inputs package, I was still able to code an excellent remote control robot vehicle. Variable speed and steering control were very responsive with no noticeable lag. And I can probably add a lot more functions to the code without the need to use multithreading. However, it will not be possible to add tasks that you expect to run during controller idle times. For example:
- Robot sensor scans and updates to a display cannot be made during controller idle times.
- An autonomous robot control function will not be able to poll events from an idle controller without becoming blocked.
- It will not be possible to switch from an active autonomous function to manual control without the independent function polling controller events.
- OpenCV applications can be less efficient if processing is restricted to fit between controller event polling.
Multithreading Performance
The Terminal application above can help check the multithreading performance of your Python application. You can install the program by entering the following command at the Terminal command prompt of the Raspberry Pi:
sudo apt-get install htop
To run the program just enter the following:
htop
Then open a second Terminal session to execute a Python program.
Multithreading
Ideally, I want the main thread of my program to run as efficiently as possible while looping continuously; executing function after function. However, connected devices such as game controllers, LCDs and many kinds of external sensors can drastically slow down or block the main program. This slowdown is due to slow IO interfaces rather than processor utilisation limits.
The game controller can restrict the scope of my robot programming in ways explained above. If I use multithreading programming techniques, I can run the game controller class methods in a separate thread from the main program. I can then use custom class methods to poll the game controller indirectly, thus avoiding the code execution blocking methods of the game controller class.
Running IO devices, in separate program threads, will allow more processor utilisation in the program’s main thread.
Multithreading Example Code
The example Python program shows one way to implement multithreading. However, this implementation may not suit all situations so research for a suitable implementation for your specific application. The example program is experimental and is so far working on a Raspberry Pi 3.
Note that multithreading works best with IO devices if the thread remains active for long durations, this is due to IO devices like the game controller having very low processor utilisation.
Code Snippets
Here I will make a few notes on code snippets taken from the full Python program.
gamepadInputs = {'ABS_X': 128, 'ABS_RZ': 127, 'BTN_SOUTH': 0, 'BTN_WEST': 0, 'BTN_START': 0}
Using the button reference in the game controller image above we can create a list of buttons we want to use. The default value is included to indicate the controller button resting position. The dictionary data structure allows access to button values utilising the button name as the key. The button names are referred to as commands throughout the program.
gamepad = ThreadedInputs()
We initialise the class that contains the gamepad and multithreading methods.
for gamepadInput in gamepadInputs: gamepad.append_command(gamepadInput, gamepadInputs[gamepadInput])
Before we start the gamepad object, we need to load it with controller buttons from the list created earlier. The object multithreading method will only record gamepad events from the controller buttons included in the list.
gamepad.start()
We execute the object start method to begin the multithreading. This object method will start the polling of the game controller in a separate thread from the main program. Therefore, the idle game controller will no longer stop the main program thread from executing.
commandInput, commandValue = gamepad.read()
The gamepad object read method returns any new controller event update; which returns button name and value. If there is no event update, the read method will return ‘No Match’.
We can now poll the game controller as many times as we want without blocking the main program.
if commandInput == 'ABS_X' or commandInput == 'ABS_RZ': # Drive and steering drive_control() elif commandInput == 'BTN_SOUTH': # Fire Nerf dart button for example fire_nerf_dart(commandInput, commandValue) elif commandInput == 'BTN_WEST': # Switch the LED Beacon for example led_beacon(commandInput, commandValue) elif commandInput == 'BTN_START': # Exit the while loop - this program is closing break
If there is a new event update from the game controller, the gamepad object read method will return the name of the button. Python ‘if’ statements is an excellent to apply actions to individual controller buttons.
sleep(0.01)
I slow the program without affecting current functions. Reducing processing loops will help save battery power on a project such as a robot vehicle.
Class Methods
def read(self): # Return the latest command from gamepad event if not self.q.empty(): newCommand = self.q.get() while not self.q.empty(): trashBin = self.q.get() return newCommand, self.gamepadInputs[newCommand] else: return self.NOMATCH, 0
So, here is the gamepad read method that will return any new game controller updates. Updates are put in the queue object, but the read method will only pass the last entry in the queue (LIFO – last in first out). Therefore, the read method will flush any older entries from the queue object. The read method will return a ‘No Match’ if the queue is empty.
def command_value(self, commandKey): # Get command value if commandKey in self.gamepadInputs: return self.gamepadInputs[commandKey] else: return None
The above method will return the latest value for any button in the list.
def gamepad_update(self): while True: # Should the thread exit? if self.stopped: return # Code execution stops at the following line until a gamepad event occurs. events = get_gamepad() for event in events: event_test = self.gamepadInputs.get(event.code, self.NOMATCH) if event_test != self.NOMATCH: self.gamepadInputs[event.code] = event.state self.lastEventCode = event.code self.q.put(event.code)
The start method starts the above method to run in a separate thread. Every game controller event is collected and filtered here. If the idle game controller blocks this method, the main program thread will continue to execute.
The Full Multithreading Program
You can also get the full multithreading Python code example from GitHub here.
Or download the code from the Raspberry Pi Terminal by entering the following at the command prompt.
wget https://github.com/MarkAHeywood/bluetin/blob/master/multithreading-raspberry-pi-game-controller-in-python/multithreading_gamepad.py
Copy and paste the above line to the Terminal if possible.
#!/usr/bin/env python3 """ multithreading_gamepad.py Mark Heywood, 2018 www.bluetin.io 10/01/2018 """ __author__ = "Mark Heywood" __version__ = "0.1.1" __license__ = "MIT" from time import sleep from threading import Thread import queue import time from inputs import get_gamepad class ThreadedInputs: NOMATCH = 'No Match' def __init__(self): # Initialise gamepad command dictionary. # Add gamepad commands using the append method before executing the start method. self.gamepadInputs = {} self.lastEventCode = self.NOMATCH # Initialise the thread status flag self.stopped = False self.q = queue.LifoQueue() def start(self): # Start the thread to poll gamepad event updates t = Thread(target=self.gamepad_update, args=()) t.daemon = True t.start() def gamepad_update(self): while True: # Should the thread exit? if self.stopped: return # Code execution stops at the following line until a gamepad event occurs. events = get_gamepad() for event in events: event_test = self.gamepadInputs.get(event.code, self.NOMATCH) if event_test != self.NOMATCH: self.gamepadInputs[event.code] = event.state self.lastEventCode = event.code self.q.put(event.code) def read(self): # Return the latest command from gamepad event if not self.q.empty(): newCommand = self.q.get() while not self.q.empty(): trashBin = self.q.get() return newCommand, self.gamepadInputs[newCommand] else: return self.NOMATCH, 0 def stop(self): # Stop the game pad thread self.stopped = True def append_command(self, newCommand, newValue): # Add new controller command to the list if newCommand not in self.gamepadInputs: self.gamepadInputs[newCommand] = newValue else: print('New command already exists') def delete_command(self, commandKey): # Remove controller command from list if commandKey in self.gamepadInputs: del self.gamepadInputs[commandKey] else: print('No command to delete') def command_value(self, commandKey): # Get command value if commandKey in self.gamepadInputs: return self.gamepadInputs[commandKey] else: return None def drive_control(): # Function to drive robot motors print('Speed -> {} || Value -> {}'.format('ABS_RZ', gamepad.command_value('ABS_RZ'))) print('Direction -> {} || Value -> {}'.format('ABS_X', gamepad.command_value('ABS_X'))) def fire_nerf_dart(commandInput, commandValue): # Function to fire Nerf dart gun on the robot print('Fire Nerf Dart -> {} Value -> {}'.format(commandInput, commandValue)) def led_beacon(commandInput, commandValue): # Function to switch led beacon on/off on the robot print('Switch LED Beacon -> {} Value -> {}'.format(commandInput, commandValue)) #----------------------------------------------------------- # Dictionary of game controller buttons we want to include. gamepadInputs = {'ABS_X': 128, 'ABS_RZ': 127, 'BTN_SOUTH': 0, 'BTN_WEST': 0, 'BTN_START': 0} # Initialise the gamepad object using the gamepad inputs Python package gamepad = ThreadedInputs() def main(): """ Main entry point of this program """ # Load the object with gamepad buttons we want to catch for gamepadInput in gamepadInputs: gamepad.append_command(gamepadInput, gamepadInputs[gamepadInput]) # Start the gamepad event update thread gamepad.start() while 1: #timeCheck = time.time() # Get the next gamepad button event commandInput, commandValue = gamepad.read() # Gamepad button command filter if commandInput == 'ABS_X' or commandInput == 'ABS_RZ': # Drive and steering drive_control() elif commandInput == 'BTN_SOUTH': # Fire Nerf dart button for example fire_nerf_dart(commandInput, commandValue) elif commandInput == 'BTN_WEST': # Switch the LED Beacon for example led_beacon(commandInput, commandValue) elif commandInput == 'BTN_START': # Exit the while loop - this program is closing break sleep(0.01) #print(commandInput, commandValue) #print(1/(time.time() - timeCheck)) # Stop the gamepad thread and close this program gamepad.stop() exit() #----------------------------------------------------------- if __name__ == "__main__": """ This is executed when run from the command line """ main()
Related Articles
Raspberry Pi Controller For Robot Control Review – Link.
Robot Control with Raspberry Pi and Python – Link.
Buying Featured Items
The purchase price is going to vary greatly depending on how quickly you want the items. Therefore shop around checking out Amazon, Ebay, Adafruit and local electronic stores.
The Pi Hut
- Raspberry Pi Compatible Wireless Gamepad / Controller – Link
UK Searches:
UK Amazon:
US Searches:
US Amazon:
On Closing
I hope you find this article useful – Multithreading Raspberry Pi Game Controller in Python, please like and share.
2 thoughts on “Multithreading Raspberry Pi Game Controller in Python”
Comments are closed.