This is an automated email from the ASF dual-hosted git repository.
oehler pushed a commit to branch
3733-create-api-endpoint-to-upload-time-series-data
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3733-create-api-endpoint-to-upload-time-series-data by this push:
new c17dda4195 Add endpoint to store measurements
c17dda4195 is described below
commit c17dda4195c096b0bee8beaf3d92f6da78922286
Author: Sven Oehler <[email protected]>
AuthorDate: Mon Aug 18 17:03:55 2025 +0200
Add endpoint to store measurements
---
.../rest/impl/datalake/DataLakeDataWriter.java | 102 +++++++++++++++++++++
.../rest/impl/datalake/DataLakeResource.java | 35 +++++++
2 files changed, 137 insertions(+)
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
new file mode 100644
index 0000000000..c658d723e8
--- /dev/null
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.impl.datalake;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataexplorer.TimeSeriesStore;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.datalake.DataSeries;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class DataLakeDataWriter {
+
+ private final IDataExplorerSchemaManagement dataExplorerSchemaManagement;
+
+ public DataLakeDataWriter(IDataExplorerSchemaManagement
dataExplorerSchemaManagement) {
+ this.dataExplorerSchemaManagement = dataExplorerSchemaManagement;
+ }
+
+ public void writeData(String measurementID, SpQueryResult queryResult) {
+ var measure = dataExplorerSchemaManagement.getById(measurementID);
+ var dataSeries = getDataSeries(queryResult);
+ getTimeSeriesStoreAndPersistQueryResult(dataSeries, measure);
+ }
+
+ private void getTimeSeriesStoreAndPersistQueryResult(DataSeries dataSeries,
+ DataLakeMeasure
measure){
+ var timeSeriesStore = getTimeSeriesStore(measure);
+ for (var row : dataSeries.getRows()) {
+ var event = rowToEvent(row, dataSeries.getHeaders());
+ renameTimestampField(event, measure.getTimestampField());
+ timeSeriesStore.onEvent(event);
+ }
+ timeSeriesStore.close();
+ }
+
+ private TimeSeriesStore getTimeSeriesStore(DataLakeMeasure measure){
+ return new TimeSeriesStore(
+ new DataExplorerDispatcher().getDataExplorerManager()
+ .getTimeseriesStorage(measure, false),
+ measure,
+ Environments.getEnvironment(),
+ true
+ );
+ }
+
+ private DataSeries getDataSeries(SpQueryResult queryResult) {
+ if (queryResult.getAllDataSeries().size() == 1) {
+ return queryResult.getAllDataSeries().get(0);
+ } else {
+ throw new SpRuntimeException("SpQueryResult must contain exactly one
data series");
+ }
+ }
+
+ private String getSubstringAfterColons(String input) {
+ int index = input.indexOf("::");
+ if (index != -1) {
+ return input.substring(index + 2);
+ }
+ return input;
+ }
+
+ private Event rowToEvent(List<Object> row, List<String> headers){
+ Map<String, Object> eventMap = IntStream.range(0, headers.size())
+ .boxed()
+ .collect(Collectors.toMap(headers::get, row::get));
+ return EventFactory.fromMap(eventMap);
+ }
+
+ private void renameTimestampField(Event event, String timestampField){
+ var strippedTime = getSubstringAfterColons(timestampField);
+ event.addField(timestampField,
event.getFieldByRuntimeName(strippedTime).getRawValue());
+ }
+
+}
+
+
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
index c757a61688..78b4469018 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java
@@ -18,9 +18,13 @@
package org.apache.streampipes.rest.impl.datalake;
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataexplorer.TimeSeriesStore;
import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.export.OutputFormat;
+import org.apache.streampipes.dataexplorer.influx.client.InfluxClientProvider;
import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.DataSeries;
@@ -28,6 +32,7 @@ import org.apache.streampipes.model.datalake.SpQueryResult;
import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.model.monitoring.SpLogMessage;
+import org.apache.streampipes.model.runtime.EventFactory;
import org.apache.streampipes.rest.core.base.impl.AbstractRestResource;
import org.apache.streampipes.rest.shared.exception.SpMessageException;
@@ -38,6 +43,9 @@ import io.swagger.v3.oas.annotations.media.ArraySchema;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
@@ -52,10 +60,12 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static
org.apache.streampipes.model.datalake.param.SupportedRestQueryParams.QP_AGGREGATION_FUNCTION;
import static
org.apache.streampipes.model.datalake.param.SupportedRestQueryParams.QP_AUTO_AGGREGATE;
@@ -84,6 +94,7 @@ import static
org.apache.streampipes.model.datalake.param.SupportedRestQueryPara
@RequestMapping("/api/v4/datalake")
public class DataLakeResource extends AbstractRestResource {
+ private static final Logger LOG =
LoggerFactory.getLogger(DataLakeResource.class);
private final IDataExplorerQueryManagement dataExplorerQueryManagement;
private final IDataExplorerSchemaManagement dataExplorerSchemaManagement;
@@ -356,6 +367,30 @@ public class DataLakeResource extends AbstractRestResource
{
}
}
+ @PostMapping(
+ path = "/measurements/{measurementID}",
+ produces = MediaType.APPLICATION_JSON_VALUE,
+ consumes = MediaType.APPLICATION_JSON_VALUE)
+ @Operation(summary = "Store a measurement series to a data lake with the
given id", tags = {"Data Lake"},
+ responses = {
+ @ApiResponse(
+ responseCode = "400",
+ description = "Can't store the given data to this data lake"),
+ @ApiResponse(
+ responseCode = "200",
+ description = "Successfully stored data")})
+ public ResponseEntity<?> storeDataToMeasurement(@PathVariable String
measurementID,
+ @RequestBody SpQueryResult
queryResult) {
+ var dataWriter = new DataLakeDataWriter(dataExplorerSchemaManagement);
+ try {
+ dataWriter.writeData(measurementID, queryResult);
+ } catch (SpRuntimeException e) {
+ LOG.warn("Could not store event", e);
+ return badRequest(Notifications.error("Could not store event for
measurement " + measurementID));
+ }
+ return ok();
+ }
+
@DeleteMapping(path = "/measurements")
@Operation(summary = "Remove all stored measurement series from Data Lake",
tags = {"Data Lake"},
responses = {