Using pyzmq to communicate between GUIs and processes
Last updated: Jan 24, 2023
Graphical user interfaces (GUIs) all want to be the main thread. They don’t play well together. Trying to run GUIs built with different libraries concurrently and get them to talk to one another took me a while to figure out. This article shows how I used the pyzmq library to communicate between two graphical user interfaces (GUIs).
I am working on unique hand gesture recognition. One GUI represents a hand position. This is represented by a GUI built with pyqt with a few range sliders. The sliders will be used to represent pitch, roll and speed of motion in the final application. A second GUI represents the gesture recognition interface. For this example it is a simple label box set up in pyqtgraph. I used pyqtgraph as this is the tool kit I am using in my final application for real time data display from an accelerometer mounted on a hand. I based my pyzmq script on the examples here.
I played with the publisher subscriber (pubsub) examples. One of the nice things about the pubsub model is that if you send something from the publisher, even if there are no subscribers waiting for the message, nothing blocks or stalls your script. Pubsub is only one way communication, from the publisher to the subscriber. I opted instead to use the pair model. In this pattern, a socket is set up that allows an object at each end to send messages back and forwards.
Pyzmq comes with a partial implementation of the Tornado server. This is explained here. So you can set up an eventloop to trigger on poll events using ioloop. If you are already using a GUI, then odds on you have an events handler running in that GUI. Getting this event handling loops to play nicely with the Tornado server led me down the coding rabbit hole. So I opted to use the event handling loop set up by timer = QtCore.QTimer() in pyqtgraph to poll one end of the pyzmq pair socket that I set up. This is not aesthetic, but I can’t see a more reliable method. I am using this QTimer to enable animation of the sensor data that I am using for displaying hand position, so it is already running. Which ever method I use to set up receiving data from the hand posture GUI, at some point I have to decide to look at the data and use it. I thought about using the pyzmq.Queue structure, which is process safe. I could use this to automatically update a list in my sensor display GUI with new posture positions. This won’t be looked at until the QTimer triggers. So I may as well simplify things and look for the updated posture position in the QTimer handling method.
Here’s the code I use to generate the rangeslider GUI. This can be downloaded from: github. Most of this is boilerplate to produce the GUI. Lines 102-107 create the pyzmq pair socket. Note the try/except wrapper in lines 97-99 around the socket.send_string. This raises a zmq.error.Again exception if there is nothing to receive the message. Using the try/except wrapper allows the code to continue. The ‘flags=zmq.NOBLOCK’ stops the code from blocking if there is nothing at the other end of the socket to receive the message. This isn’t an issue with the pubsub model; a publisher doesn’t care if there is no subscriber around to receive the message, but the pair pattern will fail without a receiver unless you explicitly tell it not to block.
@author: matthew oppenheim use pyzmq pair context for communication '''
from multiprocessing import Process
from PyQt4 import QtGui, QtCore
from qrangeslider import QRangeSlider
import sys
import zmq
from zmq.eventloop import ioloop, zmqstream
from pubsub_zmq import PubZmq, SubZmq
class Example(QtGui.QWidget):
def __init__(self):
app = QtGui.QApplication(sys.argv)
super().__init__()
ioloop.install()
self.port = 5556
self.topic = "1"
self.initUI()
sys.exit(app.exec_())
def initUI(self):
self.range_duration = QRangeSlider()
self.range_duration.show()
self.range_duration.setFixedWidth(300)
self.range_duration.setFixedHeight(36)
self.range_duration.setMin(0)
self.range_duration.setMax(1000)
self.range_duration.setRange(200,800)
self.textbox = QtGui.QLineEdit()
self.set_duration_btn = QtGui.QPushButton("send duration")
self.set_duration_btn.clicked.connect(lambda: self.button_click('duration'))
self.set_duration_btn.setFixedWidth(100)
self.range_pitch = QRangeSlider()
self.range_pitch.show()
self.range_pitch.setFixedWidth(300)
self.range_pitch.setFixedHeight(36)
self.range_pitch.setMin(-80)
self.range_pitch.setMax(80)
self.range_pitch.setRange(-20, 20)
self.set_pitch_btn = QtGui.QPushButton("send pitch")
self.set_pitch_btn.setFixedWidth(100)
self.set_pitch_btn.clicked.connect(lambda: self.button_click('pitch'))
self.range_roll = QRangeSlider()
self.range_roll.show()
self.range_roll.setFixedWidth(300)
self.range_roll.setFixedHeight(36)
self.range_roll.setMin(-80)
self.range_roll.setMax(80)
self.range_roll.setRange(-20, 20)
self.set_roll_btn = QtGui.QPushButton("send roll")
self.set_roll_btn.setFixedWidth(100)
self.set_roll_btn.clicked.connect(lambda: self.button_click('roll'))
hbox_duration = QtGui.QHBoxLayout()
hbox_duration.addStretch(1)
hbox_duration.addWidget(self.range_duration)
hbox_duration.addWidget(self.set_duration_btn)
hbox_pitch = QtGui.QHBoxLayout()
hbox_pitch.addStretch(1)
hbox_pitch.addWidget(self.range_pitch)
hbox_pitch.addWidget(self.set_pitch_btn)
hbox_pitch = QtGui.QHBoxLayout()
hbox_pitch.addStretch(1)
hbox_pitch.addWidget(self.range_pitch)
hbox_pitch.addWidget(self.set_pitch_btn)
hbox_roll = QtGui.QHBoxLayout()
hbox_roll.addStretch(1)
hbox_roll.addWidget(self.range_roll)
hbox_roll.addWidget(self.set_roll_btn)
vbox = QtGui.QVBoxLayout()
vbox.addStretch(1)
vbox.addLayout(hbox_pitch)
vbox.addLayout(hbox_roll)
vbox.addLayout(hbox_duration)
vbox.addWidget(self.textbox)
self.setLayout(vbox)
self.setGeometry(300, 300, 300, 150)
self.setWindowTitle('rangesliders')
self.socket = self.create_socket(self.port)
self.show()
@QtCore.pyqtSlot()
def button_click(self, message):
''' handle button click event '''
self.textbox.setText('sent {}'.format(message))
try:
self.socket.send_string(message, flags=zmq.NOBLOCK)
except zmq.error.Again as e:
print('no receiver for the message: {}'.format(e))
def create_socket(self, port):
''' create a socket using pyzmq with PAIR context '''
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:%s" % port)
return socket
if __name__ == '__main__':
ex = Example()
Here’s the simple label box that I use to test out receiving messages:
'''
pyqtgraph layout with a pyzmq pair context
for testing pubsub messaging with pyzmq
Created on 14 Oct 2016
using qt timer and polling instead of the tornado loop in zmq
@author: matthew oppenheim
'''
import pyqtgraph as pg
from pyqtgraph.Qt import QtGui, QtCore
from pubsub_zmq import SubZmq
from multiprocessing import Process
import zmq
import sys
import time
FRAMES_PER_SECOND = 30
class PyqtgraphPair(QtGui.QWidget):
def __init__(self):
super().__init__()
port = '5556'
topic = '1'
QtGui.QWidget.__init__(self)
self.layout = QtGui.QVBoxLayout()
self.setLayout(self.layout)
self.label = QtGui.QLabel("test")
self.set_label("new label")
self.layout.addWidget(self.label)
self.socket = self.create_socket(port)
def create_socket(self, port):
'''
Constructor
'''
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect('tcp://localhost:%s' % port)
return socket
def process_message(self, message):
''' process the subscriber's message '''
#topic, text = enumerate(a for a in message)
message = message[0].decode()
message = message.split()[1]
print('sub received {}'.format(message))
self.set_label(self.label, 'changed')
#label.setText(message)
if message == 'exit':
ioloop.IOLoop.instance().stop()
print('sub io loop stopped')
def set_label(self, text):
''' set the label to text '''
self.label.setText(text)
def timer_timeout(self):
''' handle the QTimer timeout '''
try:
msg = self.socket.recv(flags=zmq.NOBLOCK).decode()
print('message received {}'.format(msg))
self.set_label(msg)
except zmq.error.Again as e:
return
if __name__ == '__main__':
pg.mkQApp()
win = PyqtgraphPair()
win.show()
win.resize(200,200)
timer = QtCore.QTimer()
timer.timeout.connect(win.timer_timeout)
timer.start(1000/FRAMES_PER_SECOND)
#win.set_label('hello')
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()
Polling for a new message takes place in line 61. This has the same try/except wrapper as in the rangeslider example.