This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 849e431c0 [#1190] Apache TubeMQ (InLong) Adapter & Sink (#1191)
849e431c0 is described below
commit 849e431c08f467c2f233f45cfc7bfa8c4fb8d839
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Feb 3 14:35:57 2023 +0800
[#1190] Apache TubeMQ (InLong) Adapter & Sink (#1191)
* [TubeMQ Sink] implement org.apache.streampipes.sinks.brokers.jvm.tubemq
* [TubeMQ Sink] registerPipelineElements
* [TubeMQ Sink] TubeMQ resources
* [TubeMQ Adapter] TubeMQ adapter impl
* [TubeMQ] docker-compose
* [TubeMQ] fix resources: icon.svg -> icon.png
* [TubeMQ] expose 8080 and 8081 in dev.docker
* [TubeMQ] change service name in docker file to "tubemq"
* [TubeMQ Adapter] add a short description of the input parameters in doc
---
.../standalone/tubemq/docker-compose.dev.yml | 25 +++
.../deploy/standalone/tubemq/docker-compose.yml | 35 ++++
pom.xml | 6 +
.../streampipes-connect-adapters-iiot/pom.xml | 4 +
.../connect/iiot/ConnectAdapterIiotInit.java | 2 +
.../iiot/protocol/stream/TubeMQProtocol.java | 210 +++++++++++++++++++++
.../documentation.md | 48 +++++
.../icon.png | Bin 0 -> 22024 bytes
.../strings.en | 29 +++
.../streampipes-sinks-brokers-jvm/pom.xml | 4 +
.../sinks/brokers/jvm/BrokersJvmInit.java | 2 +
.../sinks/brokers/jvm/tubemq/TubeMQParameters.java | 46 +++++
.../brokers/jvm/tubemq/TubeMQPublisherSink.java | 106 +++++++++++
.../documentation.md | 55 ++++++
.../icon.png | Bin 0 -> 22024 bytes
.../strings.en | 25 +++
16 files changed, 597 insertions(+)
diff --git a/installer/cli/deploy/standalone/tubemq/docker-compose.dev.yml
b/installer/cli/deploy/standalone/tubemq/docker-compose.dev.yml
new file mode 100644
index 000000000..44ae40e08
--- /dev/null
+++ b/installer/cli/deploy/standalone/tubemq/docker-compose.dev.yml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3.4"
+services:
+ tubemq:
+ image: inlong/tubemq-all:1.5.0
+ ports:
+ - "8715:8715"
+ - "8123:8123"
+ # web
+ - "8080:8080"
+ - "8081:8081"
diff --git a/installer/cli/deploy/standalone/tubemq/docker-compose.yml
b/installer/cli/deploy/standalone/tubemq/docker-compose.yml
new file mode 100644
index 000000000..af677fd5c
--- /dev/null
+++ b/installer/cli/deploy/standalone/tubemq/docker-compose.yml
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3.4"
+services:
+ tubemq:
+ image: inlong/tubemq-all:1.5.0
+ volumes:
+ - tubemq:/tubemq
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "1m"
+ max-file: "1"
+ networks:
+ spnet:
+
+volumes:
+ tubemq:
+
+networks:
+ spnet:
+ external: true
diff --git a/pom.xml b/pom.xml
index 2163b84e0..6d1dbf8b5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -183,6 +183,7 @@
<snakeyaml.version>1.26</snakeyaml.version>
<xerces.version>2.12.2</xerces.version>
<zstd-jni.version>1.4.3-1</zstd-jni.version>
+ <inlong.version>1.5.0</inlong.version>
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
<maven.javadoc.plugin.version>3.1.1</maven.javadoc.plugin.version>
@@ -1358,6 +1359,11 @@
<artifactId>quartz</artifactId>
<version>${quartz.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>tubemq-client</artifactId>
+ <version>${inlong.version}</version>
+ </dependency>
<!-- dependency convergence -->
<dependency>
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
b/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
index 7d7b3faa4..74300e7c9 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
@@ -143,6 +143,10 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>tubemq-client</artifactId>
+ </dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk14</artifactId>
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
index 451812659..fab0dde2e 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
@@ -31,6 +31,7 @@ import
org.apache.streampipes.connect.iiot.protocol.stream.HttpStreamProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.KafkaProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.MqttProtocol;
import org.apache.streampipes.connect.iiot.protocol.stream.NatsProtocol;
+import org.apache.streampipes.connect.iiot.protocol.stream.TubeMQProtocol;
import
org.apache.streampipes.connect.iiot.protocol.stream.pulsar.PulsarProtocol;
import
org.apache.streampipes.connect.iiot.protocol.stream.rocketmq.RocketMQProtocol;
import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
@@ -63,6 +64,7 @@ public class ConnectAdapterIiotInit extends
ExtensionsModelSubmitter {
.registerAdapter(new PulsarProtocol())
.registerAdapter(new RocketMQProtocol())
.registerAdapter(new HttpServerProtocol())
+ .registerAdapter(new TubeMQProtocol())
.build();
}
}
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/TubeMQProtocol.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/TubeMQProtocol.java
new file mode 100644
index 000000000..7a042de33
--- /dev/null
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/TubeMQProtocol.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.iiot.protocol.stream;
+
+import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
+import org.apache.streampipes.extensions.api.connect.IFormat;
+import org.apache.streampipes.extensions.api.connect.IParser;
+import org.apache.streampipes.extensions.api.connect.IProtocol;
+import
org.apache.streampipes.extensions.api.connect.exception.AdapterException;
+import org.apache.streampipes.extensions.api.connect.exception.ParseException;
+import org.apache.streampipes.extensions.management.connect.SendToPipeline;
+import
org.apache.streampipes.extensions.management.connect.adapter.sdk.ParameterExtractor;
+import org.apache.streampipes.model.AdapterType;
+import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
+import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
+import org.apache.streampipes.sdk.helpers.AdapterSourceType;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.inlong.tubemq.client.common.PeerInfo;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
+import org.apache.inlong.tubemq.client.consumer.MessageListener;
+import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
+public class TubeMQProtocol extends BrokerProtocol {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TubeMQProtocol.class);
+
+ public static final String ID =
"org.apache.streampipes.connect.iiot.protocol.stream.tubemq";
+
+ public static final String TOPIC_KEY = "tubemq-topic";
+ public static final String MASTER_HOST_AND_PORT_KEY =
"tubemq-master-host-and-port";
+ public static final String CONSUMER_GROUP_KEY = "tubemq-consumer-group";
+
+ private String consumerGroup;
+
+ private MessageSessionFactory messageSessionFactory;
+ private PushMessageConsumer pushConsumer;
+
+ public TubeMQProtocol() {
+ }
+
+ private TubeMQProtocol(IParser parser, IFormat format, String
masterHostAndPort, String topic, String consumerGroup) {
+ super(parser, format, masterHostAndPort, topic);
+ this.consumerGroup = consumerGroup;
+ }
+
+ @Override
+ public IProtocol getInstance(ProtocolDescription protocolDescription,
IParser parser, IFormat format) {
+ final ParameterExtractor extractor = new
ParameterExtractor(protocolDescription.getConfig());
+
+ final String masterHostAndPort =
extractor.singleValue(MASTER_HOST_AND_PORT_KEY, String.class);
+ final String topic = extractor.singleValue(TOPIC_KEY, String.class);
+ final String consumerGroup = extractor.singleValue(CONSUMER_GROUP_KEY,
String.class);
+
+ return new TubeMQProtocol(parser, format, masterHostAndPort, topic,
consumerGroup);
+ }
+
+ @Override
+ public ProtocolDescription declareModel() {
+ return
ProtocolDescriptionBuilder.create(ID).withAssets(Assets.DOCUMENTATION,
Assets.ICON).withLocales(Locales.EN)
+ .category(AdapterType.Generic).sourceType(AdapterSourceType.STREAM)
+
.requiredTextParameter(Labels.withId(MASTER_HOST_AND_PORT_KEY)).requiredTextParameter(Labels.withId(TOPIC_KEY))
+ .requiredTextParameter(Labels.withId(CONSUMER_GROUP_KEY)).build();
+ }
+
+ @Override
+ public void run(IAdapterPipeline adapterPipeline) throws AdapterException {
+ final SendToPipeline sendToPipeline = new SendToPipeline(format,
adapterPipeline);
+
+ final ConsumerConfig consumerConfig = new ConsumerConfig(brokerUrl,
consumerGroup);
+
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+
+ try {
+ messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+ pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
+
+ pushConsumer.subscribe(topic, null, new MessageListener() {
+ @Override
+ public void receiveMessages(PeerInfo peerInfo, List<Message> messages)
{
+ for (final Message message : messages) {
+ try {
+ parser.parse(IOUtils.toInputStream(new
String(message.getData()), "UTF-8"), sendToPipeline);
+ } catch (ParseException e) {
+ LOGGER.error("Error while parsing: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ }
+ });
+ pushConsumer.completeSubscribe();
+ } catch (TubeClientException e) {
+ shutdown(messageSessionFactory, pushConsumer);
+ throw new AdapterException("Failed to create TubeMQ adapter.", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ shutdown(messageSessionFactory, pushConsumer);
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ protected List<byte[]> getNByteElements(int n) throws ParseException {
+ final List<byte[]> elements = new ArrayList<>();
+
+ final ConsumerConfig consumerConfig = new ConsumerConfig(brokerUrl,
consumerGroup);
+
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
+
+ MessageSessionFactory messageSessionFactory = null;
+ PushMessageConsumer pushConsumer = null;
+ try {
+ messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+ pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
+
+ final CountDownLatch countDownLatch = new CountDownLatch(n);
+ pushConsumer.subscribe(topic, null, new MessageListener() {
+ @Override
+ public void receiveMessages(PeerInfo peerInfo, List<Message> messages)
{
+ for (final Message message : messages) {
+ if (countDownLatch.getCount() == 0) {
+ return;
+ }
+ elements.add(message.getData());
+ countDownLatch.countDown();
+ }
+ }
+
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ }
+ });
+ pushConsumer.completeSubscribe();
+ countDownLatch.await();
+ } catch (TubeClientException | InterruptedException e) {
+ throw new ParseException("Failed to getNByteElements", e);
+ } finally {
+ shutdown(messageSessionFactory, pushConsumer);
+ }
+
+ return elements;
+ }
+
+ private static void shutdown(MessageSessionFactory messageSessionFactory,
PushMessageConsumer pushConsumer) {
+ if (pushConsumer != null && !pushConsumer.isShutdown()) {
+ try {
+ pushConsumer.shutdown();
+ } catch (Throwable ex) {
+ LOGGER.error("Failed to stop pushConsumer when TubeClientException
occurred.");
+ }
+ }
+
+ if (messageSessionFactory != null) {
+ try {
+ messageSessionFactory.shutdown();
+ } catch (TubeClientException ex) {
+ LOGGER.error("Failed to stop messageSessionFactory when
TubeClientException occurred.");
+ }
+ }
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/documentation.md
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/documentation.md
new file mode 100644
index 000000000..dbc91d9c2
--- /dev/null
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/documentation.md
@@ -0,0 +1,48 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+## Apache TubeMQ (InLong)
+
+<p align="center">
+ <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+Consumes messages from an Apache TubeMQ broker.
+
+***
+
+## Configuration
+
+### TubeMQ Master Information
+
+This field describes the endpoints of all the TubeMQ masters.
+
+The format should be like `ip1:port1,ip2:port2,ip3:port3`.
+
+### TubeMQ Topic
+
+The topic where events should be sent to.
+
+### TubeMQ Consumer Group
+
+The consumer group of the TubeMQ Consumer.
+
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/icon.png
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/icon.png
new file mode 100644
index 000000000..7c0fde0bc
Binary files /dev/null and
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/icon.png
differ
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/strings.en
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/strings.en
new file mode 100644
index 000000000..d8eadb8d0
--- /dev/null
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/strings.en
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+org.apache.streampipes.connect.iiot.protocol.stream.tubemq.title=Apache TubeMQ
(InLong)
+org.apache.streampipes.connect.iiot.protocol.stream.tubemq.description=Consumes
messages from an Apache TubeMQ (InLong) broker.
+
+tubemq-master-host-and-port.title=TubeMQ Master Information
+tubemq-master-host-and-port.description=This field describes the endpoints of
all the TubeMQ masters. The format should be like
"ip1:port1,ip2:port2,ip3:port3".
+
+tubemq-consumer-group.title=TubeMQ Consumer Group
+tubemq-consumer-group.description=The consumer group of the TubeMQ Consumer
+
+tubemq-topic.title=TubeMQ Topic
+tubemq-topic.description=Select a TubeMQ topic
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
b/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
index fb63ad3c0..cf3ce5d56 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
@@ -88,6 +88,10 @@
<artifactId>jnats</artifactId>
<version>${nats.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>tubemq-client</artifactId>
+ </dependency>
<!-- dependency convergence -->
<dependency>
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
index a6ccb66f4..e242e18e4 100644
---
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
@@ -37,6 +37,7 @@ import
org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink;
import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqController;
import org.apache.streampipes.sinks.brokers.jvm.rest.RestController;
import org.apache.streampipes.sinks.brokers.jvm.rocketmq.RocketMQPublisherSink;
+import org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink;
import org.apache.streampipes.sinks.brokers.jvm.websocket.WebsocketServerSink;
public class BrokersJvmInit extends ExtensionsModelSubmitter {
@@ -61,6 +62,7 @@ public class BrokersJvmInit extends ExtensionsModelSubmitter {
new WebsocketServerSink(),
new PulsarPublisherSink(),
new RocketMQPublisherSink(),
+ new TubeMQPublisherSink(),
new NatsController())
.registerMessagingFormats(
new JsonDataFormatFactory(),
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQParameters.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQParameters.java
new file mode 100644
index 000000000..9a81f8b51
--- /dev/null
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQParameters.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.brokers.jvm.tubemq;
+
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+
+import static
org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink.MASTER_HOST_AND_PORT_KEY;
+import static
org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink.TOPIC_KEY;
+
+public class TubeMQParameters {
+
+ private final String masterHostAndPort;
+ private final String topic;
+
+ public TubeMQParameters(SinkParams parameters) {
+ DataSinkParameterExtractor extractor = parameters.extractor();
+
+ this.masterHostAndPort =
extractor.singleValueParameter(MASTER_HOST_AND_PORT_KEY, String.class);
+ this.topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
+ }
+
+ public String getMasterHostAndPort() {
+ return masterHostAndPort;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQPublisherSink.java
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQPublisherSink.java
new file mode 100644
index 000000000..deeb2dcc3
--- /dev/null
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQPublisherSink.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.brokers.jvm.tubemq;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.apache.inlong.tubemq.client.producer.MessageSentResult;
+import org.apache.inlong.tubemq.corebase.Message;
+
+import java.util.Map;
+
+public class TubeMQPublisherSink extends StreamPipesDataSink {
+
+ public static final String MASTER_HOST_AND_PORT_KEY =
"tubemq-master-host-and-port";
+ public static final String TOPIC_KEY = "tubemq-topic";
+
+ private SpDataFormatDefinition spDataFormatDefinition;
+ private String topic;
+
+ private MessageProducer messageProducer;
+
+ @Override
+ public DataSinkDescription declareModel() {
+ return
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.tubemq").category(DataSinkType.MESSAGING)
+ .withLocales(Locales.EN).withAssets(Assets.DOCUMENTATION, Assets.ICON)
+
.requiredStream(StreamRequirementsBuilder.create().requiredProperty(EpRequirements.anyProperty()).build())
+
.requiredTextParameter(Labels.withId(MASTER_HOST_AND_PORT_KEY)).requiredTextParameter(Labels.withId(TOPIC_KEY))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(SinkParams sinkParams, EventSinkRuntimeContext
runtimeContext) throws SpRuntimeException {
+ final TubeMQParameters tubeMQParameters = new TubeMQParameters(sinkParams);
+
+ spDataFormatDefinition = new JsonDataFormatDefinition();
+ topic = tubeMQParameters.getTopic();
+
+ final TubeClientConfig tubeClientConfig = new
TubeClientConfig(tubeMQParameters.getMasterHostAndPort());
+ try {
+ messageProducer = new
TubeSingleSessionFactory(tubeClientConfig).createProducer();
+ messageProducer.publish(topic);
+ } catch (TubeClientException e) {
+ throw new SpRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onEvent(Event event) throws SpRuntimeException {
+ final Map<String, Object> eventRawMap = event.getRaw();
+ final byte[] eventMessage = spDataFormatDefinition.fromMap(eventRawMap);
+ final Message tubemqMessage = new Message(topic, eventMessage);
+
+ try {
+ final MessageSentResult result =
messageProducer.sendMessage(tubemqMessage);
+ if (!result.isSuccess()) {
+ throw new SpRuntimeException(
+ String.format("Failed to send message: %s, because: %s",
tubemqMessage, result.getErrMsg()));
+ }
+ } catch (TubeClientException | InterruptedException e) {
+ throw new SpRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ try {
+ messageProducer.shutdown();
+ } catch (Throwable e) {
+ throw new SpRuntimeException(e);
+ }
+ }
+}
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/documentation.md
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/documentation.md
new file mode 100644
index 000000000..6971c693d
--- /dev/null
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/documentation.md
@@ -0,0 +1,55 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+## TubeMQ (InLong) Publisher
+
+<p align="center">
+ <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+Publishes events to Apache TubeMQ (InLong).
+
+***
+
+## Required Inputs
+
+This sink does not have any requirements and works with any incoming event
type.
+
+***
+
+## Configuration
+
+### TubeMQ Master Information
+
+This field describes the endpoints of all the TubeMQ masters.
+
+The format should be like `ip1:port1,ip2:port2,ip3:port3`.
+
+
+### TubeMQ Topic
+
+The topic where events should be sent to.
+
+
+## Output
+
+(not applicable for data sinks)
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/icon.png
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/icon.png
new file mode 100644
index 000000000..7c0fde0bc
Binary files /dev/null and
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/icon.png
differ
diff --git
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/strings.en
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/strings.en
new file mode 100644
index 000000000..9ac5b877b
--- /dev/null
+++
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/strings.en
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.streampipes.sinks.brokers.jvm.tubemq.title=TubeMQ (InLong) Publisher
+org.apache.streampipes.sinks.brokers.jvm.tubemq.description=Publish events to
Apache TubeMQ (InLong)
+
+tubemq-master-host-and-port.title=TubeMQ Master Information
+tubemq-master-host-and-port.description=This field describes the endpoints of
all the TubeMQ masters. The format should be like
"ip1:port1,ip2:port2,ip3:port3".
+
+tubemq-topic.title=TubeMQ Topic
+tubemq-topic.description=Select a TubeMQ topic