This is an automated email from the ASF dual-hosted git repository.
oehler pushed a commit to branch
3733-create-api-endpoint-to-upload-time-series-data
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3733-create-api-endpoint-to-upload-time-series-data by this push:
new cb58c61ba8 Add chunking when sending data
cb58c61ba8 is described below
commit cb58c61ba8c21e74e7d3b9c2c369bbd229b923bf
Author: Sven Oehler <[email protected]>
AuthorDate: Tue Aug 19 18:08:34 2025 +0200
Add chunking when sending data
---
.../streampipes/endpoint/api/data_lake_measure.py | 36 ++++++++++++++--------
.../streampipes/model/resource/query_result.py | 2 ++
2 files changed, 26 insertions(+), 12 deletions(-)
diff --git
a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index 0b53e8f1fc..629708a680 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -21,6 +21,7 @@ This endpoint allows to consume data stored in StreamPipes'
data lake.
"""
from datetime import datetime
from json import dumps
+from math import ceil
from typing import Any, Dict, List, Literal, Optional, Tuple, Type
from pandas import DataFrame
@@ -381,12 +382,14 @@ class DataLakeMeasureEndpoint(APIEndpoint):
response =
self._make_request(request_method=self._parent_client.request_session.get,
url=url)
return self._resource_cls(**response.json())
- def storeDataToMeasurement(self, identifier: str, df: DataFrame,
ignore_schema_mismatch=False) -> None:
+ def storeDataToMeasurement(
+ self, identifier: str, df: DataFrame, ignore_schema_mismatch=False,
batch_size: int = 10000
+ ) -> None:
"""Stores data from a pandas DataFrame into the specified data lake
measurement.
- The provided DataFrame will be converted into a `QueryResult` using
- `QueryResult.from_pandas` and then serialized to JSON before being sent
- to the StreamPipes Data Lake. The data will be appended to the
measurement
+ The provided DataFrame will be split into chunks and converted into a
+ `QueryResult` and then serialized to JSON before being sent to the
+ StreamPipes Data Lake. The data will be appended to the measurement
identified by `identifier`.
Parameters
@@ -398,6 +401,9 @@ class DataLakeMeasureEndpoint(APIEndpoint):
must be `timestamp` and all timestamp values will be cast to
integers.
ignore_schema_mismatch: bool
Defines if mismatching events should be stored.
+ batch_size:
+ The size of the chunks in which the data gets split. This ensures
+ that requests remain reasonably small
Returns
-------
@@ -414,11 +420,17 @@ class DataLakeMeasureEndpoint(APIEndpoint):
client.dataLakeMeasureApi.storeDataToMeasurement("my-measure-id", df)
```
"""
- query_result = QueryResult.from_pandas(df)
- self._make_request(
- request_method=self._parent_client.request_session.post,
- url=f"{self.build_url()}/{identifier}",
- params={"ignoreSchemaMismatch": ignore_schema_mismatch},
- data=dumps(query_result.to_dict(use_source_names=True)),
- headers={"Content-type": "application/json"},
- )
+
+ num_chunks = ceil(len(df) / batch_size)
+ for i in range(num_chunks):
+ start = i * batch_size
+ end = (i + 1) * batch_size
+ chunk = df.iloc[start:end].copy()
+ query_result = QueryResult.from_pandas(chunk)
+ self._make_request(
+ request_method=self._parent_client.request_session.post,
+ url=f"{self.build_url()}/{identifier}",
+ params={"ignoreSchemaMismatch": ignore_schema_mismatch},
+ data=dumps(query_result.to_dict(use_source_names=True)),
+ headers={"Content-type": "application/json"},
+ )
diff --git
a/streampipes-client-python/streampipes/model/resource/query_result.py
b/streampipes-client-python/streampipes/model/resource/query_result.py
index 049992d182..1b251e0017 100644
--- a/streampipes-client-python/streampipes/model/resource/query_result.py
+++ b/streampipes-client-python/streampipes/model/resource/query_result.py
@@ -120,6 +120,8 @@ class QueryResult(Resource):
"""
if df.empty:
raise ValueError("Cannot create QueryResult from an empty
DataFrame")
+ if df.isna().any().any():
+ raise ValueError("Cannot create QueryResult from a DataFrame with
NaN values")
headers = df.columns.to_list()
if headers[0] != "timestamp":