This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch rel/0.91.0
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/rel/0.91.0 by this push:
new 54c908536 1423 update archetypes (#1425)
54c908536 is described below
commit 54c908536f5aedc8bf05c7e63940c3e5b6b38466
Author: Dominik Riemer <[email protected]>
AuthorDate: Tue Mar 21 16:36:31 2023 +0100
1423 update archetypes (#1425)
* Update maven archetypes (#1423)
* Add slf4j-api to base service module (#1424)
---
.../src/main/resources/archetype-resources/pom.xml | 12 ++--
.../archetype-resources/src/main/java/Init.java | 6 +-
.../src/main/resources/archetype-resources/pom.xml | 61 ++---------------
.../archetype-resources/src/main/java/Init.java | 10 ++-
.../__classNamePrefix__Controller.java | 2 +-
.../__classNamePrefix__Program.java | 2 +-
.../src/main/resources/archetype-resources/pom.xml | 56 +---------------
.../archetype-resources/src/main/java/Init.java | 52 +++++++++------
.../src/main/java/config/Config.java | 76 ----------------------
.../src/main/java/config/ConfigKeys.java | 15 +++--
.../__classNamePrefix__Controller.java | 11 ++--
.../__classNamePrefix__Program.java | 23 +++++--
streampipes-service-base/pom.xml | 4 ++
13 files changed, 92 insertions(+), 238 deletions(-)
diff --git
a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/pom.xml
b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/pom.xml
index dd9ec81cb..b71b32cfb 100644
---
a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/pom.xml
+++
b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/pom.xml
@@ -44,6 +44,11 @@
<artifactId>streampipes-sources</artifactId>
<version>${sp.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>2.0.6</version>
+ </dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
@@ -62,7 +67,7 @@
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
- <version>2.6.2</version>
+ <version>3.0.1</version>
<executions>
<execution>
<goals>
@@ -77,10 +82,9 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
+ <version>3.10.1</version>
<configuration>
- <source>1.8</source>
- <target>1.8</target>
+ <release>17</release>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
diff --git
a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java
b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java
index 4863966d1..777e6dc31 100644
---
a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java
+++
b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java
@@ -21,16 +21,16 @@
#set( $symbol_escape = '\' )
package ${package};
-import org.apache.streampipes.container.extensions.ExtensionsModelSubmitter;
-import org.apache.streampipes.container.model.SpServiceDefinition;
-import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
+import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
+import
org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
import ${package}.pe.${packageName}.${classNamePrefix}DataProcessor;
import ${package}.pe.${packageName}.${classNamePrefix}DataSink;
diff --git
a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
index 254fad6ef..664c54d09 100644
---
a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
+++
b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml
@@ -31,22 +31,7 @@
<dependencies>
<dependency>
<groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-container-standalone</artifactId>
- <version>${sp.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-commons</artifactId>
+ <artifactId>streampipes-service-extensions</artifactId>
<version>${sp.version}</version>
<exclusions>
<exclusion>
@@ -92,48 +77,13 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.7.24</version>
+ <version>2.0.6</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-config</artifactId>
<version>${sp.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-dataformat-json</artifactId>
- <version>${sp.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-dataformat-cbor</artifactId>
- <version>${sp.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-dataformat-smile</artifactId>
- <version>${sp.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-dataformat-fst</artifactId>
- <version>${sp.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-messaging-jms</artifactId>
- <version>${sp.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-messaging-kafka</artifactId>
- <version>${sp.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-messaging-mqtt</artifactId>
- <version>${sp.version}</version>
- </dependency>
</dependencies>
<build>
@@ -146,7 +96,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
- <version>2.4.1</version>
+ <version>3.0.1</version>
</dependency>
</dependencies>
<executions>
@@ -202,10 +152,9 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
+ <version>3.10.1</version>
<configuration>
- <source>1.8</source>
- <target>1.8</target>
+ <release>17</release>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
diff --git
a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
index 8b05fdfe3..dc52194fb 100644
---
a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
+++
b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java
@@ -21,11 +21,6 @@
package ${package};
-import org.apache.streampipes.container.init.DeclarersSingleton;
-import
org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
-import org.apache.streampipes.container.model.SpServiceDefinition;
-import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
-
import ${package}.config.ConfigKeys;
import ${package}.pe.processor.${packageName}.${classNamePrefix}Controller;
@@ -34,11 +29,14 @@ import
org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
+import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
+import
org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
-public class Init extends StandaloneModelSubmitter {
+public class Init extends ExtensionsModelSubmitter {
public static void main(String[] args) throws Exception {
new Init().init();
diff --git
a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
index 981a6a8c9..dc810a0ce 100644
---
a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
+++
b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java
@@ -21,10 +21,10 @@
package ${package}.pe.processor.${packageName};
+import org.apache.streampipes.extensions.management.config.ConfigExtractor;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.streampipes.client.StreamPipesClient;
-import org.apache.streampipes.container.config.ConfigExtractor;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
diff --git
a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
index 929e3a87a..fcad195dc 100644
---
a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
+++
b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java
@@ -26,7 +26,7 @@ import ${package}.config.ConfigKeys;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.streampipes.client.StreamPipesClient;
-import org.apache.streampipes.container.config.ConfigExtractor;
+import org.apache.streampipes.extensions.management.config.ConfigExtractor;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.runtime.Event;
import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
diff --git
a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml
b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml
index 63155f699..e3ef7c0bf 100644
---
a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml
+++
b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml
@@ -31,22 +31,7 @@
<dependencies>
<dependency>
<groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-container-standalone</artifactId>
- <version>${sp.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-commons</artifactId>
+ <artifactId>streampipes-service-extensions</artifactId>
<version>${sp.version}</version>
<exclusions>
<exclusion>
@@ -81,7 +66,7 @@
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-sdk</artifactId>
+ <artifactId>streampipes-sdk-bundle</artifactId>
<version>${sp.version}</version>
</dependency>
<dependency>
@@ -92,48 +77,13 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.7.24</version>
+ <version>2.0.6</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-config</artifactId>
<version>${sp.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-dataformat-json</artifactId>
- <version>${sp.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-dataformat-cbor</artifactId>
- <version>${sp.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-dataformat-smile</artifactId>
- <version>${sp.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-dataformat-fst</artifactId>
- <version>${sp.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-messaging-jms</artifactId>
- <version>${sp.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-messaging-kafka</artifactId>
- <version>${sp.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-messaging-mqtt</artifactId>
- <version>${sp.version}</version>
- </dependency>
</dependencies>
<build>
diff --git
a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java
b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java
index 39235cb3d..ef8c40914 100644
---
a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java
+++
b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java
@@ -21,36 +21,48 @@
#set( $symbol_escape = '\' )
package ${package};
-import org.apache.streampipes.container.init.DeclarersSingleton;
-import
org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
-import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
-import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
+import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
+import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
+import
org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
+import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
-import ${package}.config.Config;
+import ${package}.config.ConfigKeys;
import ${package}.pe.sink.${packageName}.${classNamePrefix}Controller;
-public class Init extends StandaloneModelSubmitter {
+public class Init extends ExtensionsModelSubmitter {
- public static void main(String[] args) {
- DeclarersSingleton.getInstance()
- .add(new ${classNamePrefix}Controller());
-
- DeclarersSingleton.getInstance().registerDataFormats(
- new JsonDataFormatFactory(),
- new CborDataFormatFactory(),
- new SmileDataFormatFactory(),
- new FstDataFormatFactory());
+ public static void main(String[] args) throws Exception {
+ new Init().init();
+ }
- DeclarersSingleton.getInstance().registerProtocols(
+ @Override
+ public SpServiceDefinition provideServiceDefinition() {
+ return SpServiceDefinitionBuilder.create("${package}",
+ "Apache Flink sink",
+ "",
+ 8090)
+ .registerPipelineElement(new ${classNamePrefix}Controller())
+ .registerMessagingFormats(
+ new JsonDataFormatFactory(),
+ new CborDataFormatFactory(),
+ new SmileDataFormatFactory(),
+ new FstDataFormatFactory())
+ .registerMessagingProtocols(
new SpKafkaProtocolFactory(),
- new SpMqttProtocolFactory(),
- new SpJmsProtocolFactory());
-
- new Init().init(Config.INSTANCE);
+ new SpJmsProtocolFactory(),
+ new SpMqttProtocolFactory())
+ .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink
Jobmanager")
+ .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager")
+ .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink
program")
+ .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC,
"./streampipes-processing-element-container.jar", "Jar file location")
+ .addConfig(ConfigKeys.SERVICE_NAME, "sp fft stream analytics metrics",
"Data processor service name")
+ .addConfig(ConfigKeys.HOST, "${artifactId}", "Data processor host")
+ .build();
}
}
diff --git
a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
deleted file mode 100644
index 16e02a446..000000000
---
a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#set( $symbol_pound = '#' )
-#set( $symbol_dollar = '$' )
-#set( $symbol_escape = '\' )
-#set( $svc_name =
$package.getClass().forName("org.apache.velocity.util.StringUtils").sub("$artifactId",
"-", " ") )
-package ${package}.config;
-
-import org.apache.streampipes.config.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum Config implements PeConfig {
- INSTANCE;
-
- private SpConfig config;
- public static final String JAR_FILE =
"./streampipes-processing-element-container.jar";
- private final static String SERVICE_ID = "pe/${package}.sink.flink";
-
- Config() {
- config = SpConfig.getSpConfig(SERVICE_ID);
- config.register(ConfigKeys.HOST, "${artifactId}", "Data sink host");
- config.register(ConfigKeys.PORT, 8090, "Data sink port");
- config.register(ConfigKeys.SERVICE_NAME, "${svc_name}", "Data sink service
name");
- config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Flink jobmanager
host");
- config.register(ConfigKeys.FLINK_PORT, 8081, "Flink jobmanager port");
- config.register(ConfigKeys.FLINK_DEBUG, false, "When set to true programs
are not deployed to cluster, but executed locally");
- }
-
- public String getFlinkHost() {
- return config.getString(ConfigKeys.FLINK_HOST);
- }
-
- public int getFlinkPort() {
- return config.getInteger(ConfigKeys.FLINK_PORT);
- }
-
- public boolean getFlinkDebug() {
- return config.getBoolean(ConfigKeys.FLINK_DEBUG);
- }
-
- @Override
- public String getHost() {
- return config.getString(ConfigKeys.HOST);
- }
-
- @Override
- public int getPort() {
- return config.getInteger(ConfigKeys.PORT);
- }
-
- @Override
- public String getId() {
- return SERVICE_ID;
- }
-
- @Override
- public String getName() {
- return config.getString(ConfigKeys.SERVICE_NAME);
- }
-
-}
diff --git
a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
index e307f0da9..a828f4810 100644
---
a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
+++
b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java
@@ -22,10 +22,11 @@
package ${package}.config;
public class ConfigKeys {
- final static String HOST = "SP_HOST";
- final static String PORT = "SP_PORT";
- final static String SERVICE_NAME = "SP_SERVICE_NAME";
- final static String FLINK_HOST = "SP_FLINK_HOST";
- final static String FLINK_PORT = "SP_FLINK_PORT";
- final static String FLINK_DEBUG = "SP_FLINK_DEBUG";
-}
\ No newline at end of file
+ public final static String HOST = "SP_HOST";
+ public final static String PORT = "SP_PORT";
+ public final static String SERVICE_NAME = "SP_SERVICE_NAME";
+ public final static String FLINK_HOST = "SP_FLINK_HOST";
+ public final static String FLINK_PORT = "SP_FLINK_PORT";
+ public static final String DEBUG = "SP_FLINK_DEBUG";
+ public static final String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC";
+}
diff --git
a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java
b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java
index b0bf3ee6b..eaae81d25 100644
---
a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java
+++
b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java
@@ -21,6 +21,8 @@
package ${package}.pe.sink.${packageName};
import ${package}.config.Config;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.extensions.management.config.ConfigExtractor;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -29,8 +31,6 @@ import
org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.SupportedFormats;
-import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.wrapper.flink.FlinkDataSinkDeclarer;
import org.apache.streampipes.wrapper.flink.FlinkDataSinkRuntime;
import org.apache.streampipes.sdk.helpers.*;
@@ -59,13 +59,16 @@ public class ${classNamePrefix}Controller extends
FlinkDataSinkDeclarer<${classN
}
@Override
- public FlinkDataSinkRuntime<${classNamePrefix}Parameters>
getRuntime(DataSinkInvocation graph, DataSinkParameterExtractor extractor) {
+ public FlinkDataSinkRuntime<${classNamePrefix}Parameters>
getRuntime(DataSinkInvocation graph,
+
DataSinkParameterExtractor extractor,
+
ConfigExtractor configExtractor,
+
StreamPipesClient streamPipesClient) {
String host = extractor.singleValueParameter(HOST_KEY,
String.class);
int port = extractor.singleValueParameter(PORT_KEY,
Integer.class);
String password = extractor.secretValue(PASSWORD_KEY);
${classNamePrefix}Parameters params = new
${classNamePrefix}Parameters(graph, host, port, password);
- return new ${classNamePrefix}Program(params,
Config.INSTANCE.getFlinkDebug());
+ return new ${classNamePrefix}Program(params, configExtractor,
streamPipesClient);
}
}
diff --git
a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java
b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java
index 3fedc44d7..621a810b6 100644
---
a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java
+++
b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java
@@ -21,13 +21,17 @@
#set( $symbol_escape = '\' )
package ${package}.pe.sink.${packageName};
-import ${package}.config.Config;
+import ${package}.config.ConfigKeys;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.extensions.management.config.ConfigExtractor;
import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
import org.apache.streampipes.wrapper.flink.FlinkDataSinkRuntime;
import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
import java.io.Serializable;
public class ${classNamePrefix}Program extends
FlinkDataSinkRuntime<${classNamePrefix}Parameters>
@@ -36,15 +40,20 @@ implements Serializable {
private static final long serialVersionUID = 1L;
private final ${classNamePrefix}Parameters params;
- public ${classNamePrefix}Program(${classNamePrefix}Parameters params,
boolean debug) {
- super(params, debug);
+ public ${classNamePrefix}Program(${classNamePrefix}Parameters params,
+ ConfigExtractor configExtractor,
+ StreamPipesClient streamPipesClient) {
+ super(params, configExtractor, streamPipesClient);
this.params = params;
}
@Override
- protected FlinkDeploymentConfig getDeploymentConfig() {
- return new FlinkDeploymentConfig(Config.JAR_FILE,
- Config.INSTANCE.getFlinkHost(), Config.INSTANCE.getFlinkPort());
+ protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor
configExtractor) {
+ SpConfig config = configExtractor.getConfig();
+ return new
FlinkDeploymentConfig(config.getString(ConfigKeys.FLINK_JAR_FILE_LOC),
+ config.getString(ConfigKeys.FLINK_HOST),
+ config.getInteger(ConfigKeys.FLINK_PORT),
+ config.getBoolean(ConfigKeys.DEBUG));
}
@Override
diff --git a/streampipes-service-base/pom.xml b/streampipes-service-base/pom.xml
index 1dbba04e6..5f030f24f 100644
--- a/streampipes-service-base/pom.xml
+++ b/streampipes-service-base/pom.xml
@@ -105,6 +105,10 @@
<artifactId>org.osgi.core</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>