Skip to content

StorageController

The pqopen/storagecontroller.py module provides a flexible framework for managing data storage across multiple endpoints. It supports various storage methods, including CSV files, MQTT (for IoT integration), and Home Assistant-specific MQTT topics.

The module contains the following key components:

  • StorageEndpoint: An abstract base class defining the interface for writing data to different storage endpoints.
  • StoragePlan: Manages the configuration for storing data from specific channels at specified intervals.
  • StorageController: Orchestrates the processing and storage of data across all configured plans.

Features

  • Supports multiple storage endpoints (e.g., CSV, MQTT, Home Assistant).
  • Handles both raw time-series data and aggregated data.
  • Enables event-driven data storage.
  • Designed for flexibility in integrating different storage methods.

Usage

The module is designed to be used in applications requiring robust data storage solutions. It is particularly useful for power quality applications, real-time monitoring systems, and scenarios where data needs to be stored locally and/or transmitted to remote services.

Examples:

endpoints = {
"csv": {"data_dir": "/tmp/"},
"mqtt": {
    "hostname": "localhost",
    "client_id": "pqopen-mqtt",
    "topic_prefix": "dt/pqopen"
    }
}
storage_plans = {
"plan1": {
    "endpoint": "csv",
    "interval_sec": 600,
    "channels": ["channel1", "channel2"]
    },
"plan2": {
    "endpoint": "mqtt",
    "interval_sec": 10,
    "store_events": True
    }
}
# Initialize the StorageController and configure endpoints and plans
storage_controller = StorageController(time_channel, sample_rate)
storage_controller.setup_endpoints_and_storageplans(endpoints=endpoints, storage_plans=storage_plans, available_channels=channels, measurement_id="unique_measurement_id", device_id="device_123", start_timestamp_us=start_ts)
For detailed usage and implementation details, refer to the individual class documentation within the module.

CsvStorageEndpoint

Bases: StorageEndpoint

Represents a csv storage endpoint

__init__(name, measurement_id, file_path)

Create a csv storage endpoint

Parameters:

Name Type Description Default
name str

The name of the endpoint

required
measurement_id str

Id of the measurement, will be indcluded in the transmitted data

required
file_path str | Path

Data path for the csv file

required

HomeAssistantStorageEndpoint

Bases: StorageEndpoint

Represents a MQTT endpoint (MQTT) for HomeAssistant.

__init__(name, device_id, mqtt_host, client_id, topic_prefix='pqopen/data', qos=0)

Create an endpoint for HomeAssistant via MQTT

Parameters:

Name Type Description Default
name str

The name of the endpoint

required
device_id str

The device Id

required
mqtt_host str

hostname of the MQTT broker.

required
client_id str

name to be used for mqtt client identification

required
topic_prefix str

topic prefix before device-id, no trailing /

'pqopen/data'
qos int

mqtt quality of service. valid values 0, 1, 2

0

write_aggregated_data(data, timestamp_us, interval_seconds)

Write an aggregated data message

Parameters:

Name Type Description Default
data dict

The data object to be sent

required
timestamp_us int

Timestamp (in µs) of the data set

required
interval_seconds int

Aggregation intervall, used as data tag

required

MqttStorageEndpoint

Bases: StorageEndpoint

Represents a MQTT endpoint (MQTT) for transferring data.

__init__(name, measurement_id, device_id, mqtt_host, client_id, topic_prefix='dt/pqopen', compression=True, limit_precision=None)

Create a MQTT storage endpoint

Parameters:

Name Type Description Default
name str

The name of the endpoint

required
measurement_id str

Id of the measurement, will be indcluded in the transmitted data

required
device_id str

The device Id

required
mqtt_host str

hostname of the MQTT broker.

required
client_id str

name to be used for mqtt client identification

required
topic_prefix str

topic prefix before device-id, no trailing /

'dt/pqopen'
compression bool

Flag if payload should be compressed with gzip or not

True
limit_precision int

Limit float precision to number of decimal places

None

write_aggregated_data(data, timestamp_us, interval_seconds)

Write an aggregated data message

Parameters:

Name Type Description Default
data dict

The data object to be sent

required
timestamp_us int

Timestamp (in µs) of the data set

required
interval_seconds int

Aggregation intervall, used as data tag

required

write_data_series(data)

Write a timeseries data message

Parameters:

Name Type Description Default
data dict

The data object to be sent

required

write_event(event)

Write event data message

Parameters:

Name Type Description Default
event Event

The event to be written

required

StorageController

Bases: object

Manages multiple storage plans and processes data for storage.

__init__(time_channel, sample_rate)

Parameters:

Name Type Description Default
time_channel AcqBuffer

The acquisition buffer for timestamps.

required
sample_rate float

The sampling rate in Hz.

required

add_storage_plan(storage_plan)

Adds a storage plan to the controller.

Parameters:

Name Type Description Default
storage_plan StoragePlan

The storage plan to add.

required

process()

Processes data for all storage plans based on the current acquisition state.

process_events(events)

Process events to be stored

Parameters:

Name Type Description Default
events List[Event]

List of events to be stored by each storage plan

required

setup_endpoints_and_storageplans(endpoints, storage_plans, available_channels, measurement_id, device_id, start_timestamp_us)

Setup endpoints and storage plans from config

Parameters:

Name Type Description Default
endpoints dict

Dict of endpoints to be configured (csv and persistmq are supported for now)

required
storage_plans dict

Dict of storageplans to be created

required
available_channels dict

List of all available channels

required
measurement_id str

The actual measurement id for tagging the session

required
device_id str

Id of the device for unique tagging the data origin

required
start_timestamp_us int

Timestamp of the start of measurmement

required

Raises:

Type Description
NotImplementedError

If a not implemented endpoint will be configured

StorageEndpoint

Bases: object

Represents an endpoint for storing data.

__init__(name, measurement_id)

Parameters:

Name Type Description Default
name str

The name of the storage endpoint.

required

write_aggregated_data(data, timestamp_us, interval_seconds)

Writes aggregated data to the storage endpoint.

Parameters:

Name Type Description Default
data dict

The aggregated data to store.

required
timestamp_us int

The timestamp in microseconds for the aggregated data.

required
interval_seconds int

The aggregation interval in seconds.

required

write_data_series(data)

Writes a series of data to the storage endpoint.

Parameters:

Name Type Description Default
data dict

The data to be stored, organized by channels.

required

StoragePlan

Bases: object

Defines a plan for storing data with specified intervals and channels.

__init__(storage_endpoint, start_timestamp_us, interval_seconds=10, storage_name='aggregated_data', store_events=False)

Parameters:

Name Type Description Default
storage_endpoint StorageEndpoint

The storage endpoint to use.

required
start_timestamp_us int

Starting timestamp in µs

required
interval_seconds int

The interval for aggregation in seconds.

10
storage_name str

Name of the storage dataset.

'aggregated_data'
store_events bool

Flag if events should be also stored or not

False

add_channel(channel)

Adds a data channel to the storage plan.

Parameters:

Name Type Description Default
channel DataChannelBuffer

The channel to add.

required

store_aggregated_data(stop_sidx)

Stores aggregated data from the channels in the storage plan.

Parameters:

Name Type Description Default
stop_sidx int

The stopping sample index for aggregation.

required

store_data_series(time_channel, sample_rate)

Stores a series of data (1:1) from the channels in the storage plan.

Parameters:

Name Type Description Default
time_channel AcqBuffer

The time channel for converting the acq_sidx to real timestamps.

required

TestStorageEndpoint

Bases: StorageEndpoint

A implementation of StorageEndpoint for testing purposes.