StorageController
The pqopen/storagecontroller.py
module provides a flexible framework for managing data storage across multiple endpoints.
It supports various storage methods, including CSV files, MQTT (for IoT integration), and Home Assistant-specific MQTT topics.
The module contains the following key components:
- StorageEndpoint: An abstract base class defining the interface for writing data to different storage endpoints.
- StoragePlan: Manages the configuration for storing data from specific channels at specified intervals.
- StorageController: Orchestrates the processing and storage of data across all configured plans.
Features
- Supports multiple storage endpoints (e.g., CSV, MQTT, Home Assistant).
- Handles both raw time-series data and aggregated data.
- Enables event-driven data storage.
- Designed for flexibility in integrating different storage methods.
Usage
The module is designed to be used in applications requiring robust data storage solutions. It is particularly useful for power quality applications, real-time monitoring systems, and scenarios where data needs to be stored locally and/or transmitted to remote services.
Examples:
endpoints = {
"csv": {"data_dir": "/tmp/"},
"mqtt": {
"hostname": "localhost",
"client_id": "pqopen-mqtt",
"topic_prefix": "dt/pqopen"
}
}
storage_plans = {
"plan1": {
"endpoint": "csv",
"interval_sec": 600,
"channels": ["channel1", "channel2"]
},
"plan2": {
"endpoint": "mqtt",
"interval_sec": 10,
"store_events": True
}
}
# Initialize the StorageController and configure endpoints and plans
storage_controller = StorageController(time_channel, sample_rate)
storage_controller.setup_endpoints_and_storageplans(endpoints=endpoints, storage_plans=storage_plans, available_channels=channels, measurement_id="unique_measurement_id", device_id="device_123", start_timestamp_us=start_ts)
CsvStorageEndpoint
Bases: StorageEndpoint
Represents a csv storage endpoint
__init__(name, measurement_id, file_path)
Create a csv storage endpoint
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
The name of the endpoint |
required |
measurement_id
|
str
|
Id of the measurement, will be indcluded in the transmitted data |
required |
file_path
|
str | Path
|
Data path for the csv file |
required |
HomeAssistantStorageEndpoint
Bases: StorageEndpoint
Represents a MQTT endpoint (MQTT) for HomeAssistant.
__init__(name, device_id, mqtt_host, client_id, topic_prefix='pqopen/data', qos=0)
Create an endpoint for HomeAssistant via MQTT
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
The name of the endpoint |
required |
device_id
|
str
|
The device Id |
required |
mqtt_host
|
str
|
hostname of the MQTT broker. |
required |
client_id
|
str
|
name to be used for mqtt client identification |
required |
topic_prefix
|
str
|
topic prefix before device-id, no trailing / |
'pqopen/data'
|
qos
|
int
|
mqtt quality of service. valid values 0, 1, 2 |
0
|
write_aggregated_data(data, timestamp_us, interval_seconds)
Write an aggregated data message
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
The data object to be sent |
required |
timestamp_us
|
int
|
Timestamp (in µs) of the data set |
required |
interval_seconds
|
int
|
Aggregation intervall, used as data tag |
required |
MqttStorageEndpoint
Bases: StorageEndpoint
Represents a MQTT endpoint (MQTT) for transferring data.
__init__(name, measurement_id, device_id, mqtt_host, client_id, topic_prefix='dt/pqopen', compression=True, limit_precision=None)
Create a MQTT storage endpoint
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
The name of the endpoint |
required |
measurement_id
|
str
|
Id of the measurement, will be indcluded in the transmitted data |
required |
device_id
|
str
|
The device Id |
required |
mqtt_host
|
str
|
hostname of the MQTT broker. |
required |
client_id
|
str
|
name to be used for mqtt client identification |
required |
topic_prefix
|
str
|
topic prefix before device-id, no trailing / |
'dt/pqopen'
|
compression
|
bool
|
Flag if payload should be compressed with gzip or not |
True
|
limit_precision
|
int
|
Limit float precision to number of decimal places |
None
|
write_aggregated_data(data, timestamp_us, interval_seconds)
Write an aggregated data message
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
The data object to be sent |
required |
timestamp_us
|
int
|
Timestamp (in µs) of the data set |
required |
interval_seconds
|
int
|
Aggregation intervall, used as data tag |
required |
write_data_series(data)
Write a timeseries data message
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
The data object to be sent |
required |
write_event(event)
Write event data message
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event
|
Event
|
The event to be written |
required |
StorageController
Bases: object
Manages multiple storage plans and processes data for storage.
__init__(time_channel, sample_rate)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time_channel
|
AcqBuffer
|
The acquisition buffer for timestamps. |
required |
sample_rate
|
float
|
The sampling rate in Hz. |
required |
add_storage_plan(storage_plan)
Adds a storage plan to the controller.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
storage_plan
|
StoragePlan
|
The storage plan to add. |
required |
process()
Processes data for all storage plans based on the current acquisition state.
process_events(events)
Process events to be stored
Parameters:
Name | Type | Description | Default |
---|---|---|---|
events
|
List[Event]
|
List of events to be stored by each storage plan |
required |
setup_endpoints_and_storageplans(endpoints, storage_plans, available_channels, measurement_id, device_id, start_timestamp_us)
Setup endpoints and storage plans from config
Parameters:
Name | Type | Description | Default |
---|---|---|---|
endpoints
|
dict
|
Dict of endpoints to be configured (csv and persistmq are supported for now) |
required |
storage_plans
|
dict
|
Dict of storageplans to be created |
required |
available_channels
|
dict
|
List of all available channels |
required |
measurement_id
|
str
|
The actual measurement id for tagging the session |
required |
device_id
|
str
|
Id of the device for unique tagging the data origin |
required |
start_timestamp_us
|
int
|
Timestamp of the start of measurmement |
required |
Raises:
Type | Description |
---|---|
NotImplementedError
|
If a not implemented endpoint will be configured |
StorageEndpoint
Bases: object
Represents an endpoint for storing data.
__init__(name, measurement_id)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
The name of the storage endpoint. |
required |
write_aggregated_data(data, timestamp_us, interval_seconds)
Writes aggregated data to the storage endpoint.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
The aggregated data to store. |
required |
timestamp_us
|
int
|
The timestamp in microseconds for the aggregated data. |
required |
interval_seconds
|
int
|
The aggregation interval in seconds. |
required |
write_data_series(data)
Writes a series of data to the storage endpoint.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
The data to be stored, organized by channels. |
required |
StoragePlan
Bases: object
Defines a plan for storing data with specified intervals and channels.
__init__(storage_endpoint, start_timestamp_us, interval_seconds=10, storage_name='aggregated_data', store_events=False)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
storage_endpoint
|
StorageEndpoint
|
The storage endpoint to use. |
required |
start_timestamp_us
|
int
|
Starting timestamp in µs |
required |
interval_seconds
|
int
|
The interval for aggregation in seconds. |
10
|
storage_name
|
str
|
Name of the storage dataset. |
'aggregated_data'
|
store_events
|
bool
|
Flag if events should be also stored or not |
False
|
add_channel(channel)
Adds a data channel to the storage plan.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
channel
|
DataChannelBuffer
|
The channel to add. |
required |
store_aggregated_data(stop_sidx)
Stores aggregated data from the channels in the storage plan.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stop_sidx
|
int
|
The stopping sample index for aggregation. |
required |
store_data_series(time_channel, sample_rate)
Stores a series of data (1:1) from the channels in the storage plan.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time_channel
|
AcqBuffer
|
The time channel for converting the acq_sidx to real timestamps. |
required |