This is an automated email from the ASF dual-hosted git repository.

oehler pushed a commit to branch 
3733-create-api-endpoint-to-upload-time-series-data
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3733-create-api-endpoint-to-upload-time-series-data by this push:
     new cb58c61ba8 Add chunking when sending data
cb58c61ba8 is described below

commit cb58c61ba8c21e74e7d3b9c2c369bbd229b923bf
Author: Sven Oehler <[email protected]>
AuthorDate: Tue Aug 19 18:08:34 2025 +0200

    Add chunking when sending data
---
 .../streampipes/endpoint/api/data_lake_measure.py  | 36 ++++++++++++++--------
 .../streampipes/model/resource/query_result.py     |  2 ++
 2 files changed, 26 insertions(+), 12 deletions(-)

diff --git 
a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py 
b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index 0b53e8f1fc..629708a680 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -21,6 +21,7 @@ This endpoint allows to consume data stored in StreamPipes' 
data lake.
 """
 from datetime import datetime
 from json import dumps
+from math import ceil
 from typing import Any, Dict, List, Literal, Optional, Tuple, Type
 
 from pandas import DataFrame
@@ -381,12 +382,14 @@ class DataLakeMeasureEndpoint(APIEndpoint):
         response = 
self._make_request(request_method=self._parent_client.request_session.get, 
url=url)
         return self._resource_cls(**response.json())
 
-    def storeDataToMeasurement(self, identifier: str, df: DataFrame, 
ignore_schema_mismatch=False) -> None:
+    def storeDataToMeasurement(
+        self, identifier: str, df: DataFrame, ignore_schema_mismatch=False, 
batch_size: int = 10000
+    ) -> None:
         """Stores data from a pandas DataFrame into the specified data lake 
measurement.
 
-        The provided DataFrame will be converted into a `QueryResult` using
-        `QueryResult.from_pandas` and then serialized to JSON before being sent
-        to the StreamPipes Data Lake. The data will be appended to the 
measurement
+        The provided DataFrame will be split into chunks and converted into a
+        `QueryResult` and then serialized to JSON before being sent to the
+        StreamPipes Data Lake. The data will be appended to the measurement
         identified by `identifier`.
 
         Parameters
@@ -398,6 +401,9 @@ class DataLakeMeasureEndpoint(APIEndpoint):
             must be `timestamp` and all timestamp values will be cast to 
integers.
         ignore_schema_mismatch: bool
             Defines if mismatching events should be stored.
+        batch_size:
+            The size of the chunks in which the data gets split. This ensures
+            that requests remain reasonably small
 
         Returns
         -------
@@ -414,11 +420,17 @@ class DataLakeMeasureEndpoint(APIEndpoint):
         client.dataLakeMeasureApi.storeDataToMeasurement("my-measure-id", df)
         ```
         """
-        query_result = QueryResult.from_pandas(df)
-        self._make_request(
-            request_method=self._parent_client.request_session.post,
-            url=f"{self.build_url()}/{identifier}",
-            params={"ignoreSchemaMismatch": ignore_schema_mismatch},
-            data=dumps(query_result.to_dict(use_source_names=True)),
-            headers={"Content-type": "application/json"},
-        )
+
+        num_chunks = ceil(len(df) / batch_size)
+        for i in range(num_chunks):
+            start = i * batch_size
+            end = (i + 1) * batch_size
+            chunk = df.iloc[start:end].copy()
+            query_result = QueryResult.from_pandas(chunk)
+            self._make_request(
+                request_method=self._parent_client.request_session.post,
+                url=f"{self.build_url()}/{identifier}",
+                params={"ignoreSchemaMismatch": ignore_schema_mismatch},
+                data=dumps(query_result.to_dict(use_source_names=True)),
+                headers={"Content-type": "application/json"},
+            )
diff --git 
a/streampipes-client-python/streampipes/model/resource/query_result.py 
b/streampipes-client-python/streampipes/model/resource/query_result.py
index 049992d182..1b251e0017 100644
--- a/streampipes-client-python/streampipes/model/resource/query_result.py
+++ b/streampipes-client-python/streampipes/model/resource/query_result.py
@@ -120,6 +120,8 @@ class QueryResult(Resource):
         """
         if df.empty:
             raise ValueError("Cannot create QueryResult from an empty 
DataFrame")
+        if df.isna().any().any():
+            raise ValueError("Cannot create QueryResult from a DataFrame with 
NaN values")
 
         headers = df.columns.to_list()
         if headers[0] != "timestamp":

Reply via email to