This is an automated email from the ASF dual-hosted git repository.

oehler pushed a commit to branch 
3733-create-api-endpoint-to-upload-time-series-data
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3733-create-api-endpoint-to-upload-time-series-data by this push:
     new c17dda4195 Add endpoint to store measurements
c17dda4195 is described below

commit c17dda4195c096b0bee8beaf3d92f6da78922286
Author: Sven Oehler <[email protected]>
AuthorDate: Mon Aug 18 17:03:55 2025 +0200

    Add endpoint to store measurements
---
 .../rest/impl/datalake/DataLakeDataWriter.java     | 102 +++++++++++++++++++++
 .../rest/impl/datalake/DataLakeResource.java       |  35 +++++++
 2 files changed, 137 insertions(+)

diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
new file mode 100644
index 0000000000..c658d723e8
--- /dev/null
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.impl.datalake;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataexplorer.TimeSeriesStore;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.datalake.DataSeries;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class DataLakeDataWriter {
+
+  private final IDataExplorerSchemaManagement dataExplorerSchemaManagement;
+
+  public DataLakeDataWriter(IDataExplorerSchemaManagement 
dataExplorerSchemaManagement) {
+    this.dataExplorerSchemaManagement = dataExplorerSchemaManagement;
+  }
+
+  public void writeData(String measurementID, SpQueryResult queryResult) {
+    var measure = dataExplorerSchemaManagement.getById(measurementID);
+    var dataSeries = getDataSeries(queryResult);
+    getTimeSeriesStoreAndPersistQueryResult(dataSeries, measure);
+  }
+
+  private void getTimeSeriesStoreAndPersistQueryResult(DataSeries dataSeries,
+                                                       DataLakeMeasure 
measure){
+    var timeSeriesStore = getTimeSeriesStore(measure);
+    for (var row : dataSeries.getRows()) {
+      var event = rowToEvent(row, dataSeries.getHeaders());
+      renameTimestampField(event, measure.getTimestampField());
+      timeSeriesStore.onEvent(event);
+    }
+    timeSeriesStore.close();
+  }
+
+  private TimeSeriesStore getTimeSeriesStore(DataLakeMeasure measure){
+    return new TimeSeriesStore(
+        new DataExplorerDispatcher().getDataExplorerManager()
+            .getTimeseriesStorage(measure, false),
+        measure,
+        Environments.getEnvironment(),
+        true
+    );
+  }
+
+  private DataSeries getDataSeries(SpQueryResult queryResult) {
+    if (queryResult.getAllDataSeries().size() == 1) {
+      return queryResult.getAllDataSeries().get(0);
+    } else {
+      throw new SpRuntimeException("SpQueryResult must contain exactly one 
data series");
+    }
+  }
+
+  private String getSubstringAfterColons(String input) {
+    int index = input.indexOf("::");
+    if (index != -1) {
+      return input.substring(index + 2);
+    }
+    return input;
+  }
+
+  private Event rowToEvent(List<Object> row, List<String> headers){
+    Map<String, Object> eventMap = IntStream.range(0, headers.size())
+        .boxed()
+        .collect(Collectors.toMap(headers::get, row::get));
+    return EventFactory.fromMap(eventMap);
+  }
+
+  private void renameTimestampField(Event event, String timestampField){
+    var strippedTime = getSubstringAfterColons(timestampField);
+    event.addField(timestampField, 
event.getFieldByRuntimeName(strippedTime).getRawValue());
+  }
+
+}
+
+
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
index c757a61688..78b4469018 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
@@ -18,9 +18,13 @@
 
 package org.apache.streampipes.rest.impl.datalake;
 
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataexplorer.TimeSeriesStore;
 import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
 import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
 import org.apache.streampipes.dataexplorer.export.OutputFormat;
+import org.apache.streampipes.dataexplorer.influx.client.InfluxClientProvider;
 import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.datalake.DataSeries;
@@ -28,6 +32,7 @@ import org.apache.streampipes.model.datalake.SpQueryResult;
 import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
 import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.model.monitoring.SpLogMessage;
+import org.apache.streampipes.model.runtime.EventFactory;
 import org.apache.streampipes.rest.core.base.impl.AbstractRestResource;
 import org.apache.streampipes.rest.shared.exception.SpMessageException;
 
@@ -38,6 +43,9 @@ import io.swagger.v3.oas.annotations.media.ArraySchema;
 import io.swagger.v3.oas.annotations.media.Content;
 import io.swagger.v3.oas.annotations.media.Schema;
 import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -52,10 +60,12 @@ import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 import 
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static 
org.apache.streampipes.model.datalake.param.SupportedRestQueryParams.QP_AGGREGATION_FUNCTION;
 import static 
org.apache.streampipes.model.datalake.param.SupportedRestQueryParams.QP_AUTO_AGGREGATE;
@@ -84,6 +94,7 @@ import static 
org.apache.streampipes.model.datalake.param.SupportedRestQueryPara
 @RequestMapping("/api/v4/datalake")
 public class DataLakeResource extends AbstractRestResource {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataLakeResource.class);
   private final IDataExplorerQueryManagement dataExplorerQueryManagement;
   private final IDataExplorerSchemaManagement dataExplorerSchemaManagement;
 
@@ -356,6 +367,30 @@ public class DataLakeResource extends AbstractRestResource 
{
     }
   }
 
+  @PostMapping(
+      path = "/measurements/{measurementID}",
+      produces = MediaType.APPLICATION_JSON_VALUE,
+      consumes = MediaType.APPLICATION_JSON_VALUE)
+  @Operation(summary = "Store a measurement series to a data lake with the 
given id", tags = {"Data Lake"},
+      responses = {
+          @ApiResponse(
+              responseCode = "400",
+              description = "Can't store the given data to this data lake"),
+          @ApiResponse(
+              responseCode = "200",
+              description = "Successfully stored data")})
+  public ResponseEntity<?> storeDataToMeasurement(@PathVariable String 
measurementID,
+                                                  @RequestBody SpQueryResult 
queryResult) {
+    var dataWriter = new DataLakeDataWriter(dataExplorerSchemaManagement);
+    try {
+      dataWriter.writeData(measurementID, queryResult);
+    } catch (SpRuntimeException e) {
+      LOG.warn("Could not store event", e);
+      return badRequest(Notifications.error("Could not store event for 
measurement " + measurementID));
+    }
+    return ok();
+  }
+
   @DeleteMapping(path = "/measurements")
   @Operation(summary = "Remove all stored measurement series from Data Lake", 
tags = {"Data Lake"},
       responses = {

Reply via email to