Using pyzmq to communicate between GUIs and processes

Graphical user interfaces (GUIs) all want to be the main thread. They don’t play well together. Trying to run GUIs built with different libraries concurrently and get them to talk to one another took me a while to figure out. This article shows how I used the pyzmq library to communicate between two graphical user interfaces (GUIs). 

 
I am working on unique hand gesture recognition. One GUI represents a hand position. This is represented by a GUI built with pyqt with a few range sliders. The sliders will be used to represent pitch, roll and speed of motion in the final application. A second GUI represents the gesture recognition interface. For this example it is a simple label box set up in pyqtgraph. I used pyqtgraph as this is the tool kit I am using in my final application for real time data display from an accelerometer mounted on a hand. I based my pyzmq script on the examples here.
 
I played with the publisher subscriber (pubsub) examples. One of the nice things about the pubsub model is that if you send something from the publisher, even if there are no subscribers waiting for the message, nothing blocks or stalls your script. Pubsub is only one way communication, from the publisher to the subscriber. I opted instead to use the pair model. In this pattern, a socket is set up that allows an object at each end to send messages back and forwards.
 
Pyzmq comes with a partial implementation of the Tornado server. This is explained here. So you can set up an eventloop to trigger on poll events using ioloop. If you are already using a GUI, then odds on you have an events handler running in that GUI. Getting this event handling loops to play nicely with the Tornado server led me down the coding rabbit hole. So I opted to use the event handling loop set up by timer = QtCore.QTimer() in pyqtgraph to poll one end of the pyzmq pair socket that I set up. This is not aesthetic, but I can’t see a more reliable method. I am using this QTimer to enable animation of the sensor data that I am using for displaying hand position, so it is already running. Which ever method I use to set up receiving data from the hand posture GUI, at some point I have to decide to look at the data and use it. I thought about using the pyzmq.Queue structure, which is process safe. I could use this to automatically update a list in my sensor display GUI with new posture positions. This won’t be looked at until the QTimer triggers. So I may as well simplify things and look for the updated posture position in the QTimer handling method.
 
Here’s the code I use to generate the rangeslider GUI. This can be downloaded from: github. Most of this is boilerplate to produce the GUI. Lines 102-107 create the pyzmq pair socket. Note the try/except wrapper in lines 97-99 around the socket.send_string. This raises a zmq.error.Again exception if there is nothing to receive the message. Using the try/except wrapper allows the code to continue. The ‘flags=zmq.NOBLOCK’ stops the code from blocking if there is nothing at the other end of the socket to receive the message. This isn’t an issue with the pubsub model; a publisher doesn’t care if there is no subscriber around to receive the message, but the pair pattern will fail without a receiver unless you explicitly tell it not to block.
'''
Created on 10 Oct 2016

@author: matthew oppenheim
use pyzmq pair context for communication
'''

from multiprocessing import Process
from PyQt4 import QtGui, QtCore
from qrangeslider import QRangeSlider
import sys
import zmq
from zmq.eventloop import ioloop, zmqstream
from pubsub_zmq import PubZmq, SubZmq


class Example(QtGui.QWidget):
    
    def __init__(self):
        app = QtGui.QApplication(sys.argv)
        super().__init__()
        ioloop.install()
        self.port = 5556
        self.topic = "1"
        self.initUI()
        sys.exit(app.exec_())
        

    def initUI(self):
        self.range_duration = QRangeSlider()   
        self.range_duration.show()
        self.range_duration.setFixedWidth(300)
        self.range_duration.setFixedHeight(36)
        self.range_duration.setMin(0)
        self.range_duration.setMax(1000)
        self.range_duration.setRange(200,800)
        self.textbox = QtGui.QLineEdit()
        self.set_duration_btn = QtGui.QPushButton("send duration")
        self.set_duration_btn.clicked.connect(lambda:
            self.button_click('duration'))
        self.set_duration_btn.setFixedWidth(100)
        self.range_pitch = QRangeSlider()    
        self.range_pitch.show()    
        self.range_pitch.setFixedWidth(300)
        self.range_pitch.setFixedHeight(36)
        self.range_pitch.setMin(-80)
        self.range_pitch.setMax(80)
        self.range_pitch.setRange(-20, 20)
        self.set_pitch_btn = QtGui.QPushButton("send pitch")
        self.set_pitch_btn.setFixedWidth(100)
        self.set_pitch_btn.clicked.connect(lambda:
            self.button_click('pitch'))
        self.range_roll = QRangeSlider()    
        self.range_roll.show()    
        self.range_roll.setFixedWidth(300)
        self.range_roll.setFixedHeight(36)
        self.range_roll.setMin(-80)
        self.range_roll.setMax(80)
        self.range_roll.setRange(-20, 20)
        self.set_roll_btn = QtGui.QPushButton("send roll")
        self.set_roll_btn.setFixedWidth(100)
        self.set_roll_btn.clicked.connect(lambda: 
            self.button_click('roll'))
        hbox_duration = QtGui.QHBoxLayout()
        hbox_duration.addStretch(1)
        hbox_duration.addWidget(self.range_duration)
        hbox_duration.addWidget(self.set_duration_btn)
        hbox_pitch = QtGui.QHBoxLayout()
        hbox_pitch.addStretch(1)
        hbox_pitch.addWidget(self.range_pitch)
        hbox_pitch.addWidget(self.set_pitch_btn)
        hbox_pitch = QtGui.QHBoxLayout()
        hbox_pitch.addStretch(1)
        hbox_pitch.addWidget(self.range_pitch)
        hbox_pitch.addWidget(self.set_pitch_btn)

        hbox_roll = QtGui.QHBoxLayout()
        hbox_roll.addStretch(1)
        hbox_roll.addWidget(self.range_roll)
        hbox_roll.addWidget(self.set_roll_btn)

        vbox = QtGui.QVBoxLayout()
        vbox.addStretch(1)
        vbox.addLayout(hbox_pitch)
        vbox.addLayout(hbox_roll)
        vbox.addLayout(hbox_duration)
        vbox.addWidget(self.textbox)
        
        self.setLayout(vbox)    
        self.setGeometry(300, 300, 300, 150)
        self.setWindowTitle('rangesliders')
        self.socket = self.create_socket(self.port)
        self.show()
     
    @QtCore.pyqtSlot()   
    def button_click(self, message):
        ''' handle button click event '''
        self.textbox.setText('sent {}'.format(message))
        try:
            self.socket.send_string(message, flags=zmq.NOBLOCK)
        except zmq.error.Again as e:
            print('no receiver for the message: {}'.format(e))
        

    def create_socket(self, port):
        ''' create a socket using pyzmq with PAIR context '''
        context = zmq.Context()
        socket = context.socket(zmq.PAIR)
        socket.bind("tcp://*:%s" % port)
        return socket
                
if __name__ == '__main__':
    ex = Example()

Here’s the simple label box that I use to test out receiving messages:

'''
pyqtgraph layout with a pyzmq pair context
for testing pubsub messaging with pyzmq
Created on 14 Oct 2016
using qt timer and polling instead of the tornado loop in zmq
@author: matthew oppenheim
'''

import pyqtgraph as pg
from pyqtgraph.Qt import QtGui, QtCore
from pubsub_zmq import SubZmq
from multiprocessing import Process
import zmq
import sys
import time

FRAMES_PER_SECOND = 30

class PyqtgraphPair(QtGui.QWidget):
    def __init__(self):
        super().__init__()
        port = '5556'
        topic = '1'
        QtGui.QWidget.__init__(self)
        self.layout = QtGui.QVBoxLayout()
        self.setLayout(self.layout)
        self.label = QtGui.QLabel("test")
        self.set_label("new label")
        self.layout.addWidget(self.label)
        self.socket = self.create_socket(port)

        
    def create_socket(self, port):
        '''
        Constructor
        '''
        context = zmq.Context()
        socket = context.socket(zmq.PAIR)
        socket.connect('tcp://localhost:%s' % port) 
        return socket


    def set_label(self, text):
        ''' set the label to text '''
        self.label.setText(text)


    def timer_timeout(self):
        ''' handle the QTimer timeout '''
        try:
            msg = self.socket.recv(flags=zmq.NOBLOCK).decode()
            print('message received {}'.format(msg))
            self.set_label(msg)
         except zmq.error.Again as e:
            return
        
        
if __name__ == '__main__':
    pg.mkQApp()
    win = PyqtgraphPair()
    win.show()
    win.resize(200,200)
    timer = QtCore.QTimer()
    timer.timeout.connect(win.timer_timeout)
    timer.start(1000/FRAMES_PER_SECOND)
    #win.set_label('hello')
    if (sys.flags.interactive != 1) or not hasattr(QtCore,
       'PYQT_VERSION'):
        QtGui.QApplication.instance().exec_()

Polling for a new message takes place in line 61. This has the same try/except wrapper as in the rangeslider example.

python – how to communicate between threads using pydispatcher

The pydispatcher module makes it straight forwards to communicate between different threads in the same process in python.

Why would I want to do this?

I am collecting and processing sensor data from an accelerometer and want to display this real-time. The interface has some controls to save the data and to change the sampling rate of the sensor. Naturally, I want to interact with the user interface without having to wait for the sensor data to be collected and processed. I also want the sensor to be continuously sampled, not having to wait for the real-time display to update.

I run the the graphical user interface (GUI) in one thread and use a separate thread to handle getting data from the sensor. This way the sensor is continuously sampled and the display remains responsive.

I use pydispatcher to send sensor measurements from the sensor thread the display thread. I also use pydispatcher to communicate from the display thread back to the sensor thread to control the rate that the sensor collects data or to stop data collection. So I have two way communication between the threads. I pass numpy arrays from the sensor thread to the display and send text from the display thread to the sensor thread. The text is then interpreted by the sensor thread to alter the sensor sampling rate, or stop sampling. Pydispatcher does not seem to mind what kind of data is sent as messages.

The application that I have described takes up quite a lot of code and is split over several classes. So I will present the code for a simpler example, which shows how to set up and apply pydispatcher and introduces some of the features that makes the library versatile.

Here is an example python 3 script that creates two threads and has them communicate. When the script is executed, as it will have the __name__ as __main__, so lines 46-50 will be the first to execute. A thread that instigates the Alice class is defined and created in lines 47-48 and a separate thread that instigates the Bob class is defined then started in lines 49-50.

In line 26 the alice_thread thread prints out a message ‘Alice is procrastinating’ every second.

In line 43 the bob_thread sends a message to the alice_thread every three seconds using a dispatcher. The alice_thread reacts to this dispatcher message by returning a message of her own to the bob_thread using a separate dispatcher.

If we look at line 15 in the Alice class, a dispatcher listener is set up:

dispatcher.connect(self.alice_dispatcher_receive, signal=BOB_SIGNAL, sender=BOB_SENDER)

This means that when a dispatcher.send statement with the signal BOB_SIGNAL and sender BOB_SENDER is executed anywhere else in the process, the method alice_dispatcher will be triggered so long as an instance of the Alice class has been created. In line 43, the Bob class sets up a dispatcher sender, which is designed to trigger the dispatcher listener in the Alice class described above.

dispatcher.send(message='message from Bob', signal=BOB_SIGNAL, sender=BOB_SENDER)

Having signal and sender names for each dispatcher listener and sender is a little confusing at first. Why do we have to define two identifiers for the dispatcher? Being able to define two identifiers allows us to group dispatchers from the same sender, using the sender identifier. Then we can have the same sender class sending different types of signal, for example data from different sensors, each one with the same sender identifier but each one with different signal identifier. This is verbose, but this verbosity makes for unambiguous easy to maintain code.

Lines 6-9 define the names of the signals and senders for Alice and Bob.

When the alice_thread receives a dispatch from the bob_thread thread, she replies with a dispatch sender of her own (line 21). The corresponding dispatch listener is defined in the Bob class in line 33.

''' demonstrate the pydispatch module '''
from pydispatch import dispatcher
import threading
import time

ALICE_SIGNAL='alice_signal'
ALICE_SENDER='alice_sender'
BOB_SIGNAL='bob_signal'
BOB_SENDER='bob_sender'

class Alice():
''' alice procrastinates and replies to bob'''
def __init__(self):
print('alice instantiated')
dispatcher.connect(self.alice_dispatcher_receive, signal=BOB_SIGNAL, sender=BOB_SENDER)
self.alice()

def alice_dispatcher_receive(self, message):
''' handle dispatcher'''
print('alice has received message: {}'.format(message))
dispatcher.send(message='thankyou from Alice', signal=ALICE_SIGNAL, sender=ALICE_SENDER)

def alice(self):
''' loop and wait '''
while(1):
print('Alice is procrastinating')
time.sleep(1)

class Bob():
''' bob contacts alice periodically '''
def __init__(self):
print('Bob instantiated')
dispatcher.connect(self.bob_dispatcher_receive, signal=ALICE_SIGNAL, sender=ALICE_SENDER)
self.bob()

def bob_dispatcher_receive(self, message):
''' handle dispatcher '''
print('bob has received message: {}'.format(message))

def bob(self):
''' loop and send messages using a dispatcher '''
while(1):
dispatcher.send(message='message from Bob', signal=BOB_SIGNAL, sender=BOB_SENDER)
time.sleep(3)

if __name__ == '__main__':
alice_thread = threading.Thread(target=Alice)
alice_thread.start()
bob_thread = threading.Thread(target=Bob)
bob_thread.start()
Output:
alice instantiated
Alice is procrastinating
Bob instantiated
alice has received message: message from Bob
bob has received message: thankyou from Alice
Alice is procrastinating
Alice is procrastinating
Alice is procrastinating
alice has received message: message from Bob
bob has received message: thankyou from Alice
Alice is procrastinating
Alice is procrastinating
alice has received message: message from Bob
bob has received message: thankyou from Alice
Alice is procrastinating
Alice is procrastinating
Alice is procrastinating
alice has received message: message from Bob
bob has received message: thankyou from Alice

To conclude. There are different ways to communicate between threads in python. I choose pydispatcher as the library allows me to write code that I can understand when I come back to it 6 months later and I don’t have to worry about the type of message that I am passing between the threads.