This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch add-manual-data-lake-dimensions-selection in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 790dc86103402ea559be320e1fe13474165d439e Author: Dominik Riemer <[email protected]> AuthorDate: Mon Aug 12 15:54:38 2024 +0200 feat: Support manual assignments of dimensions in data lake sink --- .../api/extractor/IParameterExtractor.java | 2 + .../jvm/InternalSinksExtensionModuleExports.java | 5 +- .../jvm/datalake/DataLakeDimensionProvider.java | 70 ++++++++++++++++ .../sinks/internal/jvm/datalake/DataLakeSink.java | 76 +++++++++++------ .../migrations/DataLakeSinkMigrationV2.java | 58 +++++++++++++ .../documentation.md | 10 +++ .../strings.en | 3 + .../migrations/DataLakeSinkMigrationV2Test.java | 95 ++++++++++++++++++++++ .../sdk/extractor/AbstractParameterExtractor.java | 6 ++ 9 files changed, 301 insertions(+), 24 deletions(-) diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IParameterExtractor.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IParameterExtractor.java index dbcdb4eba4..8efad70197 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IParameterExtractor.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/extractor/IParameterExtractor.java @@ -95,4 +95,6 @@ public interface IParameterExtractor { List<String> getEventPropertiesSelectorByScope(PropertyScope scope); List<EventProperty> getEventPropertiesByScope(PropertyScope scope); + + List<EventProperty> getInputEventProperties(int streamIndex); } diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/InternalSinksExtensionModuleExports.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/InternalSinksExtensionModuleExports.java index 8dba55f9aa..72bcbcb7d1 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/InternalSinksExtensionModuleExports.java +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/InternalSinksExtensionModuleExports.java @@ -24,6 +24,7 @@ import org.apache.streampipes.extensions.api.migration.IModelMigrator; import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement; import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink; import org.apache.streampipes.sinks.internal.jvm.datalake.migrations.DataLakeSinkMigrationV1; +import org.apache.streampipes.sinks.internal.jvm.datalake.migrations.DataLakeSinkMigrationV2; import org.apache.streampipes.sinks.internal.jvm.notification.InternalStreamPipesNotificationSink; import java.util.Collections; @@ -45,6 +46,8 @@ public class InternalSinksExtensionModuleExports implements IExtensionModuleExpo @Override public List<IModelMigrator<?, ?>> migrators() { - return List.of(new DataLakeSinkMigrationV1()); + return List.of( + new DataLakeSinkMigrationV1(), + new DataLakeSinkMigrationV2()); } } diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeDimensionProvider.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeDimensionProvider.java new file mode 100644 index 0000000000..05c02730ee --- /dev/null +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeDimensionProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.sinks.internal.jvm.datalake; + +import org.apache.streampipes.model.schema.EventProperty; +import org.apache.streampipes.model.schema.EventPropertyPrimitive; +import org.apache.streampipes.model.schema.PropertyScope; +import org.apache.streampipes.model.staticproperty.Option; +import org.apache.streampipes.model.staticproperty.RuntimeResolvableAnyStaticProperty; +import org.apache.streampipes.sdk.utils.Datatypes; +import org.apache.streampipes.vocabulary.SO; + +import java.net.URI; +import java.util.List; + +public class DataLakeDimensionProvider { + + public void applyOptions(List<EventProperty> inputFields, + RuntimeResolvableAnyStaticProperty staticProperty) { + var primitiveFields = getPrimitiveFields(inputFields); + primitiveFields + .forEach(field -> addFieldIfNotExists(field, staticProperty.getOptions())); + staticProperty.getOptions().removeIf(o -> !existsInFields(o, primitiveFields)); + } + + private List<EventPropertyPrimitive> getPrimitiveFields(List<EventProperty> inputFields) { + return inputFields + .stream() + .filter(field -> field instanceof EventPropertyPrimitive) + .filter(field -> satisfiesFilter((EventPropertyPrimitive) field)) + .map(field -> (EventPropertyPrimitive) field) + .toList(); + } + + private boolean satisfiesFilter(EventPropertyPrimitive field) { + return !field.getRuntimeType().equals(Datatypes.Float.toString()) + && !(field.getDomainProperties().stream().map(URI::toString).toList().contains(SO.DATE_TIME)); + } + + private void addFieldIfNotExists(EventPropertyPrimitive field, + List<Option> options) { + if (options.stream().noneMatch(o -> o.getName().equals(field.getRuntimeName()))) { + options.add(new Option( + field.getRuntimeName(), + PropertyScope.valueOf(field.getPropertyScope()) == PropertyScope.DIMENSION_PROPERTY) + ); + } + } + + private boolean existsInFields(Option o, + List<EventPropertyPrimitive> primitiveFields) { + return primitiveFields.stream().anyMatch(field -> field.getRuntimeName().equals(o.getName())); + } +} diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java index ef88bbdcdc..5417a17dd7 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java @@ -22,14 +22,19 @@ import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.dataexplorer.TimeSeriesStore; import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher; +import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; +import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig; import org.apache.streampipes.model.DataSinkType; import org.apache.streampipes.model.datalake.DataLakeMeasure; import org.apache.streampipes.model.datalake.DataLakeMeasureSchemaUpdateStrategy; import org.apache.streampipes.model.extensions.ExtensionAssetType; import org.apache.streampipes.model.graph.DataSinkDescription; import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.model.schema.EventSchema; import org.apache.streampipes.model.schema.PropertyScope; +import org.apache.streampipes.model.staticproperty.RuntimeResolvableAnyStaticProperty; +import org.apache.streampipes.model.staticproperty.StaticProperty; import org.apache.streampipes.sdk.builder.DataSinkBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; import org.apache.streampipes.sdk.helpers.EpRequirements; @@ -39,15 +44,20 @@ import org.apache.streampipes.sdk.helpers.Options; import org.apache.streampipes.wrapper.params.compat.SinkParams; import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink; -public class DataLakeSink extends StreamPipesDataSink { +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DataLakeSink extends StreamPipesDataSink implements SupportsRuntimeConfig { private static final String DATABASE_MEASUREMENT_KEY = "db_measurement"; private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping"; public static final String SCHEMA_UPDATE_KEY = "schema_update"; + public static final String DIMENSIONS_KEY = "dimensions_selection"; public static final String SCHEMA_UPDATE_OPTION = "Update schema"; public static final String EXTEND_EXISTING_SCHEMA_OPTION = "Extend existing schema"; + private static final Logger LOG = LoggerFactory.getLogger(DataLakeSink.class); private TimeSeriesStore timeSeriesStore; @@ -55,25 +65,25 @@ public class DataLakeSink extends StreamPipesDataSink { @Override public DataSinkDescription declareModel() { return DataSinkBuilder - .create("org.apache.streampipes.sinks.internal.jvm.datalake", 1) - .withLocales(Locales.EN) - .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) - .category(DataSinkType.INTERNAL) - .requiredStream(StreamRequirementsBuilder - .create() - .requiredPropertyWithUnaryMapping( - EpRequirements.timestampReq(), - Labels.withId(TIMESTAMP_MAPPING_KEY), - PropertyScope.NONE - ) - .build()) - .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY)) - .requiredSingleValueSelection( - Labels.withId(SCHEMA_UPDATE_KEY), - Options.from(SCHEMA_UPDATE_OPTION, EXTEND_EXISTING_SCHEMA_OPTION) - ) - - .build(); + .create("org.apache.streampipes.sinks.internal.jvm.datalake", 2) + .withLocales(Locales.EN) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) + .category(DataSinkType.INTERNAL) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredPropertyWithUnaryMapping( + EpRequirements.timestampReq(), + Labels.withId(TIMESTAMP_MAPPING_KEY), + PropertyScope.NONE + ) + .build()) + .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY)) + .requiredSingleValueSelection( + Labels.withId(SCHEMA_UPDATE_KEY), + Options.from(SCHEMA_UPDATE_OPTION, EXTEND_EXISTING_SCHEMA_OPTION) + ) + .requiredMultiValueSelectionFromContainer(Labels.withId(DIMENSIONS_KEY)) + .build(); } @Override @@ -81,9 +91,20 @@ public class DataLakeSink extends StreamPipesDataSink { var extractor = parameters.extractor(); var timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY); var measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class); - var eventSchema = parameters.getInputSchemaInfos() - .get(0) - .getEventSchema(); + var dimensions = extractor.selectedMultiValues(DIMENSIONS_KEY, String.class); + var eventSchema = new EventSchema(parameters.getInputSchemaInfos() + .get(0) + .getEventSchema() + .getEventProperties() + .stream() + .peek(ep -> { + if (dimensions.contains(ep.getRuntimeName())) { + LOG.info("Using {} as dimension", ep.getRuntimeName()); + ep.setPropertyScope(PropertyScope.DIMENSION_PROPERTY.name()); + } + }) + .toList() + ); var measure = new DataLakeMeasure(measureName, timestampField, eventSchema); @@ -118,4 +139,13 @@ public class DataLakeSink extends StreamPipesDataSink { public void onDetach() throws SpRuntimeException { this.timeSeriesStore.close(); } + + @Override + public StaticProperty resolveConfiguration(String staticPropertyInternalName, + IStaticPropertyExtractor extractor) { + var staticProperty = extractor.getStaticPropertyByName(DIMENSIONS_KEY, RuntimeResolvableAnyStaticProperty.class); + var inputFields = extractor.getInputEventProperties(0); + new DataLakeDimensionProvider().applyOptions(inputFields, staticProperty); + return staticProperty; + } } diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2.java new file mode 100644 index 0000000000..d47bf06037 --- /dev/null +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.sinks.internal.jvm.datalake.migrations; + +import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor; +import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.model.graph.DataSinkInvocation; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.migration.ModelMigratorConfig; +import org.apache.streampipes.model.staticproperty.RuntimeResolvableAnyStaticProperty; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeDimensionProvider; +import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink; + +public class DataLakeSinkMigrationV2 implements IDataSinkMigrator { + @Override + public ModelMigratorConfig config() { + return new ModelMigratorConfig( + "org.apache.streampipes.sinks.internal.jvm.datalake", + SpServiceTagPrefix.DATA_SINK, + 1, + 2 + ); + } + + @Override + public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation element, + IDataSinkParameterExtractor extractor) throws RuntimeException { + var label = Labels.from(DataLakeSink.DIMENSIONS_KEY, "Dimensions", "Selected fields will be stored as dimensions."); + var staticProperty = new RuntimeResolvableAnyStaticProperty( + label.getInternalId(), + label.getLabel(), + label.getDescription() + ); + var inputFields = element.getInputStreams().get(0).getEventSchema().getEventProperties(); + new DataLakeDimensionProvider().applyOptions(inputFields, staticProperty); + + element.getStaticProperties().add(staticProperty); + return MigrationResult.success(element); + } +} diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/documentation.md b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/documentation.md index 2d58b7e3d9..d87bc9a6e0 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/documentation.md +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/documentation.md @@ -41,6 +41,16 @@ This sink requires an event that provides a timestamp value (a field that is mar ## Configuration +### Dimensions + +The fields which will be stored as dimensional values in the time series storage. Dimensions are typically identifiers +such as the ID of a sensor. +Dimensions support grouping in the data explorer, but will be converted to a text-based field and provide less advanced +filtering capabilities. + +Be careful when modifying dimensions of existing pipelines! This might have impact on how you are able to view data in +the data explorer due to schema incompatibilities. + ### Identifier The name of the measurement (table) where the events are stored. diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en index 0b604bdfd9..b6ea6ef6aa 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en @@ -26,3 +26,6 @@ timestamp_mapping.description=The value which contains a timestamp schema_update.title=Schema Update schema_update.description=Update existing schemas with the new one or extend the existing schema with new properties + +dimensions_selection.key=Dimensions +dimensions_selection.description=Selected fields will be stored as dimensions. diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/test/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2Test.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/test/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2Test.java new file mode 100644 index 0000000000..69d696da07 --- /dev/null +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/test/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV2Test.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.sinks.internal.jvm.datalake.migrations; + +import org.apache.streampipes.model.SpDataStream; +import org.apache.streampipes.model.graph.DataSinkInvocation; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.schema.EventSchema; +import org.apache.streampipes.model.schema.PropertyScope; +import org.apache.streampipes.model.staticproperty.RuntimeResolvableAnyStaticProperty; +import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder; +import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor; +import org.apache.streampipes.sdk.utils.Datatypes; +import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class DataLakeSinkMigrationV2Test { + + @Test + public void migrate() { + var dataLakeSinkMigrationV2 = new DataLakeSinkMigrationV2(); + + var stream = new SpDataStream(); + stream.setEventSchema(makeSchema()); + var extractor = mock(DataSinkParameterExtractor.class); + var invocation = new DataSinkInvocation(); + invocation.setStaticProperties(new ArrayList<>()); + invocation.setInputStreams(List.of(stream)); + + var actual = dataLakeSinkMigrationV2.migrate(invocation, extractor); + + Assertions.assertTrue(actual.success()); + Assertions.assertEquals(actual.element() + .getStaticProperties() + .size(), 1); + var dimensionConfig = getAnyStaticProperty(actual); + Assertions.assertEquals(dimensionConfig.getInternalName(), DataLakeSink.DIMENSIONS_KEY); + Assertions.assertEquals(dimensionConfig.getOptions().size(), 3); + Assertions.assertTrue(dimensionConfig.getOptions().get(0).isSelected()); + Assertions.assertFalse(dimensionConfig.getOptions().get(1).isSelected()); + Assertions.assertFalse(dimensionConfig.getOptions().get(2).isSelected()); + } + + private static RuntimeResolvableAnyStaticProperty getAnyStaticProperty(MigrationResult<DataSinkInvocation> actual) { + return (RuntimeResolvableAnyStaticProperty) actual.element() + .getStaticProperties() + .get(0); + } + + private static EventSchema makeSchema() { + return new EventSchema( + List.of( + PrimitivePropertyBuilder + .create(Datatypes.String, "a") + .scope(PropertyScope.DIMENSION_PROPERTY) + .build(), + PrimitivePropertyBuilder + .create(Datatypes.Float, "b") + .scope(PropertyScope.MEASUREMENT_PROPERTY) + .build(), + PrimitivePropertyBuilder + .create(Datatypes.Integer, "c") + .scope(PropertyScope.MEASUREMENT_PROPERTY) + .build(), + PrimitivePropertyBuilder + .create(Datatypes.Boolean, "d") + .scope(PropertyScope.MEASUREMENT_PROPERTY) + .build() + ) + ); + } +} diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java index 2bec1f786f..7160c68e6b 100644 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java +++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java @@ -570,4 +570,10 @@ public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesE ep.getPropertyScope() != null && ep.getPropertyScope().equals(scope.name())) .collect(Collectors.toList()); } + + @Override + public List<EventProperty> getInputEventProperties(int streamIndex) { + return !sepaElement.getInputStreams().isEmpty() + ? sepaElement.getInputStreams().get(0).getEventSchema().getEventProperties() : List.of(); + } }
