Skip to content

DaqZmq

Module for transferring ADC data via ZeroMQ.

This module provides classes for publishing and subscribing to DAQ devices data using ZeroMQ sockets. It enables efficient data transfer over a network, allowing for real-time communication between data acquisition systems and client applications.

Usage

The module includes two main classes: - DaqPublisher: Publishes ADC data to a specified TCP address. - DaqSubscriber: Subscribes to ADC data from a specified TCP address.

Examples:

Publishing ADC data:

>>> publisher = DaqPublisher(host="127.0.0.1", port=50001)
>>> publisher.send_data(np.array([1, 2, 3]), packet_num=1, timestamp=1623540000.0)
>>> publisher.terminate()

Subscribing to ADC data:

>>> subscriber = DaqSubscriber(host="127.0.0.1", port=50001)
>>> metadata, data = subscriber.recv_data()
>>> subscriber.terminate()

Classes:

Name Description
DaqPublisher

Publishes ADC data and metadata over a ZeroMQ socket.

DaqSubscriber

Subscribes to ADC data and metadata over a ZeroMQ socket.

DaqPublisher

Bases: object

Publishes ADC data and metadata over a ZeroMQ socket.

DaqPublisher is used to send ADC data along with metadata to subscribers over a network using the PUB-SUB pattern of ZeroMQ. It allows for efficient broadcasting of data to multiple clients.

Attributes:

Name Type Description
zmq_context

The ZeroMQ context for managing socket connections.

sock

The ZeroMQ PUB socket for data transmission.

_daq_info dict

A dictionary containing DAQ configuration information.

_data_columns dict

A dictionary mapping data columns to DAQ channels.

Methods:

Name Description
send_data

Sends measurement data and metadata.

terminate

Closes the ZeroMQ socket and destroys the context.

Examples:

>>> publisher = DaqPublisher(host="127.0.0.1", port=50001)
>>> publisher.send_data(np.array([1, 2, 3]), packet_num=1, timestamp=1623540000.0)
>>> publisher.terminate()

__init__(daq_info, data_columns, host='127.0.0.1', port=50001)

Initialize the DaqPublisher instance.

Sets up a ZeroMQ PUB socket to publish data to the specified host and port.

Parameters:

Name Type Description Default
daq_info DaqInfo

The DAQ configuration to be published.

required
data_columns dict

A dictionary mapping data columns to DAQ channels.

required
host str

The IP address (or hostname) to bind the publisher to (default: "127.0.0.1").

'127.0.0.1'
port int

The port number to bind the publisher to (default: 50001).

50001

send_data(m_data, packet_num, timestamp, sync_status=False)

Send measurement data along with metadata.

Sends ADC data as a numpy array, accompanied by metadata such as timestamp, packet number, and synchronization status.

Parameters:

Name Type Description Default
m_data ndarray

The measurement data to be sent.

required
packet_num int

The packet number for the data.

required
timestamp float

The timestamp associated with the data.

required
sync_status bool

Indicates if the data is synchronized (default: False).

False

Returns:

Type Description
int

Number of bytes sent.

terminate()

Terminate the publisher by closing the socket and destroying the context.

Properly closes the ZeroMQ socket and terminates the context to release resources.

DaqSubscriber

Bases: object

Subscribes to ADC data and metadata over a ZeroMQ socket.

DaqSubscriber connects to a ZeroMQ publisher and receives ADC data along with metadata. It allows clients to listen for data broadcasts from a DaqPublisher.

Attributes:

Name Type Description
zmq_context

The ZeroMQ context for managing socket connections.

sock

The ZeroMQ SUB socket for data reception.

timestamp float

The timestamp of the last received data packet.

daq_info DaqInfo

The DAQ configuration of the received data.

data_columns dict

A dictionary mapping data columns to DAQ channels.

packet_num int

The packet number of the last received data.

sync_status bool

Indicates if master clock is synchronized

Methods:

Name Description
recv_data

Receives a numpy array along with its metadata.

terminate

Closes the ZeroMQ socket and destroys the context.

Examples:

>>> subscriber = DaqSubscriber(host="127.0.0.1", port=50001)
>>> metadata, data = subscriber.recv_data()
>>> subscriber.terminate()

__init__(host='127.0.0.1', port=50001, init_daqinfo=True, connect_timeout=1.0)

Initialize the DaqSubscriber instance.

Sets up a ZeroMQ SUB socket to receive data from the specified host and port. Optionally attempts to retrieve the initial DAQ metadata upon connection.

Parameters:

Name Type Description Default
host str

The IP address to connect to (default: "127.0.0.1").

'127.0.0.1'
port int

The port number to connect to (default: 50001).

50001
init_daqinfo bool

Whether to retrieve initial DAQ metadata from the publisher (default: True).

True
connect_timeout float

The timeout duration (in seconds) for connection attempts (default: 1.0).

1.0

recv_data(update_daqinfo=False, flags=0)

Receive a numpy array along with its metadata.

Waits for incoming data and metadata from the publisher, reconstructing the numpy array from the received buffer.

Parameters:

Name Type Description Default
update_daqinfo bool

Whether to update DAQ metadata upon receiving the packet (default: False).

False
flags int

Optional ZeroMQ flags to pass for receiving data (default: 0).

0

Examples:

>>> data = subscriber.recv_data()

terminate()

Terminate the subscriber by closing the socket and destroying the context.

Properly closes the ZeroMQ socket and terminates the context to release resources.