Skip to content

ChannelBuffer

Module for buffering acquisition and channel data.

This module provides classes for buffering data acquired from data acquisition systems. It includes AcqBufferPool for managing buffers for multiple channels with a single timebase, and AcqBuffer for implementing a cyclic buffer for individual channels.

Usage

The primary classes in this module are AcqBufferPool and AcqBuffer. AcqBufferPool facilitates buffering data for multiple channels, allowing for convenient storage and retrieval of both channel data and timestamps. AcqBuffer is used for buffering data in a cyclic manner, with support for scaling and delaying the data.

Examples:

Create an AcqBufferPool instance and add data with timestamps:

>>> from channelbuffer import AcqBufferPool
>>> my_daq_info = DaqInfo(...)  # Assume this is an initialized DaqInfo object
>>> buffer_pool = AcqBufferPool(daq_info=my_daq_info)
>>> buffer_pool.put_data_with_timestamp(data_array, timestamp_us=12345678)

Classes:

Name Description
AcqBufferPool

Manages buffers for multiple channels with a single timebase.

AcqBuffer

Implements a cyclic buffer for individual channels.

AcqBuffer

Bases: object

__init__(size=100000, scale_gain=1.0, scale_offset=0.0, sample_delay=0, dtype=np.float32, name=None)

Initialize the AcqBuffer instance for buffering acquired data in a cyclic manner.

Sets up a cyclic buffer with the specified size, data type, scaling, and offset properties. Optionally, a sample delay can be specified to adjust data reads.

Parameters:

Name Type Description Default
size int

Size of the buffer (number of elements).

100000
scale_gain float

Gain applied to the data upon storage.

1.0
scale_offset float

Offset applied to the data upon storage.

0.0
sample_delay int

Delay applied to the data during reads.

0
dtype dtype

Datatype of the buffer.

float32
name str

Optional name for the buffer instance.

None

put_data(data)

Add data to the buffer in a cyclic manner.

Inserts the provided data into the buffer, applying the configured gain and offset. If the data size exceeds the available space, the buffer wraps around and overwrites the oldest data.

Parameters:

Name Type Description Default
data array

A numpy array of data to be added to the buffer.

required

Returns:

Type Description
int

The updated total sample count in the buffer.

read_data_by_index(start_idx, stop_idx)

Read data from the buffer by specifying a sample index range.

Retrieves data from the buffer starting from start_idx up to stop_idx, applying the sample delay if configured. The data read may overlap if the indices wrap around the cyclic buffer.

Parameters:

Name Type Description Default
start_idx int

Starting sample index (inclusive).

required
stop_idx int

Stopping sample index (exclusive).

required

Returns:

Type Description
ndarray

An array containing the requested data range. Returns None if the indices are out of bounds.

reset()

Reset the buffer, clearing all stored data.

Resets the buffer to its initial state by zeroing out all data and resetting the write index and sample count.

AcqBufferPool

Bases: object

Manages buffers for multiple channels with a single timebase.

AcqBufferPool facilitates the buffering of data for more than one channel, all sharing a common timebase. It provides methods for adding data and timestamps, maintaining synchronization across channels.

Attributes:

Name Type Description
channel dict

A dictionary mapping channel names to their respective AcqBuffer instances.

time AcqBuffer

Buffer for storing timestamps.

actual_sidx int

Index of the last sample added to the buffers.

_daq_info DaqInfo

Information about the DAQ system, including channel configurations.

_data_columns dict

Maps analog input pins to data column indices.

_buffer_size int

Number of samples in each channel's buffer.

_last_timestamp_us int

Stores the last timestamp in microseconds.

_time_batch_array array

Array used to generate timestamp batches.

Methods:

Name Description
put_data

np.array): Adds data to the channel buffers.

add_timestamp

int, num_samples: int): Adds timestamps to the time buffer.

put_data_with_timestamp

np.array, timestamp_us: int): Adds channel data and timestamps to the buffers.

put_data_with_samplerate

np.array, samplerate: float): Adds data and updates timestamps using sample rate.

Examples:

>>> buffer_pool = AcqBufferPool(daq_info=my_daq_info, data_columns=my_data_columns, size=50000)
>>> buffer_pool.put_data_with_timestamp(data_array, timestamp_us=987654321)

__init__(daq_info, data_columns, size=100000, start_timestamp_us=0)

Initialize the AcqBufferPool instance for buffering multi-channel data.

Sets up the buffers for each channel as defined in the provided DaqInfo object and prepares the time buffer for storing timestamps.

Parameters:

Name Type Description Default
daq_info DaqInfo

An instance of DaqInfo to configure the channel buffers.

required
data_columns dict

Dictionary mapping AI pins to data column indices.

required
size int

Number of samples in each buffer.

100000
start_timestamp_us int

Starting timestamp offset in microseconds.

0

add_timestamp(timestamp_us, num_samples)

Add timestamps to the time buffer.

Generates a series of timestamps based on the provided timestamp_us and num_samples, and stores them in the time buffer.

Parameters:

Name Type Description Default
timestamp_us int

Timestamp of the most recent sample in microseconds.

required
num_samples int

Number of samples to generate timestamps for.

required

put_data(data)

Add data to the channel buffers.

Distributes the provided data across the buffers for each channel. The number of columns in data must match the number of channels defined in daq_info.

Parameters:

Name Type Description Default
data array

A 2D numpy array where each column corresponds to a channel's data.

required

put_data_with_samplerate(data, samplerate)

Add channel data to the buffers and extend time axis with sample rate.

Adds the provided data to the channel buffers and updates the time buffer with sample converted to a timestamp.

Parameters:

Name Type Description Default
data array

A 2D numpy array where each column corresponds to a channel's data.

required
samplerate float

Samplerate of the data acquired.

required

put_data_with_timestamp(data, timestamp_us)

Add channel data to the buffers along with the corresponding timestamp.

Adds the provided data to the channel buffers and updates the time buffer with the given timestamp_us.

Parameters:

Name Type Description Default
data array

A 2D numpy array where each column corresponds to a channel's data.

required
timestamp_us int

Timestamp of the most recent sample in microseconds.

required

reset()

Reset all underlying buffer, clearing all stored data.

Resets the buffer to its initial state by zeroing out all data and resetting the write index and sample count.

DataChannelBuffer

Bases: object

A class to manage data buffering for a data channel, supporting operations such as data insertion, retrieval, and aggregation.

Attributes:

Name Type Description
name str

Name of the data channel.

unit str

Unit of the data.

agg_type str

Type of aggregation to apply (e.g., 'rms', 'max', 'phi').

_data ndarray

Buffer for storing data samples.

_acq_sidx ndarray

Array for storing acquisition indices.

sample_count int

Number of samples stored in the buffer.

last_write_idx int

Index of the last written sample in the buffer.

last_sample_value float | ndarray

Value of the last sample added.

last_sample_ts int

Timestamp of the last sample added.

Methods:

Name Description
put_data_single

Inserts a single data sample into the buffer.

read_data_by_acq_sidx

Reads data samples from the buffer based on acquisition indices.

_read_data_by_idx

Internal method to retrieve data by array indices.

read_agg_data_by_acq_sidx

Retrieves aggregated data from the buffer based on acquisition indices.

__init__(name, size=5000, sample_dimension=1, agg_type=None, unit='', agg_function=None, *args, **kwargs)

Initializes a DataChannelBuffer instance.

Parameters:

Name Type Description Default
name str

Name of the data channel.

required
size int

Size of the buffer. Defaults to 5000.

5000
sample_dimension int

Dimension of each sample. Defaults to 1.

1
agg_type str

Type of aggregation to apply ('rms', 'max', 'phi'). Defaults to None.

None
unit str

Unit of the data. Defaults to an empty string.

''
agg_function

custom function use for aggregation, provide additional named parameters if required

None

put_data_multi(acq_sidx, values)

Inserts multiple data samples into the buffer.

Parameters:

Name Type Description Default
acq_sidx ndarray

Acquisition index or timestamp of the sample.

required
value

The data sample to insert.

required

Returns:

Name Type Description
int int

Updated sample count in the buffer.

put_data_single(acq_sidx, value)

Inserts a single data sample into the buffer.

Parameters:

Name Type Description Default
acq_sidx int

Acquisition index or timestamp of the sample.

required
value float | float64 | ndarray

The data sample to insert.

required

Returns:

Name Type Description
int int

Updated sample count in the buffer.

read_agg_data_by_acq_sidx(start_idx, stop_idx, include_next=False)

Retrieves aggregated data from the buffer based on acquisition indices.

Parameters:

Name Type Description Default
start_idx int

Start acquisition index.

required
stop_idx int

Stop acquisition index.

required
include_next bool

If True, includes the next sample after the stop index and excludes first if many. Defaults to False.

False

Returns:

Type Description
float | list

float | np.ndarray: The aggregated data based on the specified aggregation type.

read_data_by_acq_sidx(start_idx, stop_idx, include_next=False)

Reads data samples from the buffer based on acquisition indices.

Parameters:

Name Type Description Default
start_idx int

Start acquisition index.

required
stop_idx int

Stop acquisition index.

required
include_next bool

If True, includes the next sample after the stop index and excludes first if many. Defaults to False.

False

Returns:

Name Type Description
tuple Tuple[ndarray, ndarray]

A tuple containing the data and acq_sidx.