This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 849e431c0 [#1190] Apache TubeMQ (InLong) Adapter & Sink (#1191)
849e431c0 is described below

commit 849e431c08f467c2f233f45cfc7bfa8c4fb8d839
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Feb 3 14:35:57 2023 +0800

    [#1190] Apache TubeMQ (InLong) Adapter & Sink (#1191)
    
    * [TubeMQ Sink] implement org.apache.streampipes.sinks.brokers.jvm.tubemq
    
    * [TubeMQ Sink] registerPipelineElements
    
    * [TubeMQ Sink] TubeMQ resources
    
    * [TubeMQ Adapter] TubeMQ adapter impl
    
    * [TubeMQ] docker-compose
    
    * [TubeMQ] fix resources: icon.svg -> icon.png
    
    * [TubeMQ] expose 8080 and 8081 in dev.docker
    
    * [TubeMQ] change service name in docker file to "tubemq"
    
    * [TubeMQ Adapter] add a short description of the input parameters in doc
---
 .../standalone/tubemq/docker-compose.dev.yml       |  25 +++
 .../deploy/standalone/tubemq/docker-compose.yml    |  35 ++++
 pom.xml                                            |   6 +
 .../streampipes-connect-adapters-iiot/pom.xml      |   4 +
 .../connect/iiot/ConnectAdapterIiotInit.java       |   2 +
 .../iiot/protocol/stream/TubeMQProtocol.java       | 210 +++++++++++++++++++++
 .../documentation.md                               |  48 +++++
 .../icon.png                                       | Bin 0 -> 22024 bytes
 .../strings.en                                     |  29 +++
 .../streampipes-sinks-brokers-jvm/pom.xml          |   4 +
 .../sinks/brokers/jvm/BrokersJvmInit.java          |   2 +
 .../sinks/brokers/jvm/tubemq/TubeMQParameters.java |  46 +++++
 .../brokers/jvm/tubemq/TubeMQPublisherSink.java    | 106 +++++++++++
 .../documentation.md                               |  55 ++++++
 .../icon.png                                       | Bin 0 -> 22024 bytes
 .../strings.en                                     |  25 +++
 16 files changed, 597 insertions(+)

diff --git a/installer/cli/deploy/standalone/tubemq/docker-compose.dev.yml 
b/installer/cli/deploy/standalone/tubemq/docker-compose.dev.yml
new file mode 100644
index 000000000..44ae40e08
--- /dev/null
+++ b/installer/cli/deploy/standalone/tubemq/docker-compose.dev.yml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3.4"
+services:
+  tubemq:
+    image: inlong/tubemq-all:1.5.0
+    ports:
+      - "8715:8715"
+      - "8123:8123"
+      # web
+      - "8080:8080"
+      - "8081:8081"
diff --git a/installer/cli/deploy/standalone/tubemq/docker-compose.yml 
b/installer/cli/deploy/standalone/tubemq/docker-compose.yml
new file mode 100644
index 000000000..af677fd5c
--- /dev/null
+++ b/installer/cli/deploy/standalone/tubemq/docker-compose.yml
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3.4"
+services:
+  tubemq:
+    image: inlong/tubemq-all:1.5.0
+    volumes:
+      - tubemq:/tubemq
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "1m"
+        max-file: "1"
+    networks:
+      spnet:
+
+volumes:
+  tubemq:
+
+networks:
+  spnet:
+    external: true
diff --git a/pom.xml b/pom.xml
index 2163b84e0..6d1dbf8b5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -183,6 +183,7 @@
        <snakeyaml.version>1.26</snakeyaml.version>
        <xerces.version>2.12.2</xerces.version>
        <zstd-jni.version>1.4.3-1</zstd-jni.version>
+       <inlong.version>1.5.0</inlong.version>
 
        <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
        <maven.javadoc.plugin.version>3.1.1</maven.javadoc.plugin.version>
@@ -1358,6 +1359,11 @@
                 <artifactId>quartz</artifactId>
                 <version>${quartz.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.inlong</groupId>
+                <artifactId>tubemq-client</artifactId>
+                <version>${inlong.version}</version>
+            </dependency>
 
             <!-- dependency convergence -->
             <dependency>
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml 
b/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
index 7d7b3faa4..74300e7c9 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
@@ -143,6 +143,10 @@
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client-java</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>tubemq-client</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.bouncycastle</groupId>
             <artifactId>bcprov-jdk14</artifactId>
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
index 451812659..fab0dde2e 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/ConnectAdapterIiotInit.java
@@ -31,6 +31,7 @@ import 
org.apache.streampipes.connect.iiot.protocol.stream.HttpStreamProtocol;
 import org.apache.streampipes.connect.iiot.protocol.stream.KafkaProtocol;
 import org.apache.streampipes.connect.iiot.protocol.stream.MqttProtocol;
 import org.apache.streampipes.connect.iiot.protocol.stream.NatsProtocol;
+import org.apache.streampipes.connect.iiot.protocol.stream.TubeMQProtocol;
 import 
org.apache.streampipes.connect.iiot.protocol.stream.pulsar.PulsarProtocol;
 import 
org.apache.streampipes.connect.iiot.protocol.stream.rocketmq.RocketMQProtocol;
 import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
@@ -63,6 +64,7 @@ public class ConnectAdapterIiotInit extends 
ExtensionsModelSubmitter {
         .registerAdapter(new PulsarProtocol())
         .registerAdapter(new RocketMQProtocol())
         .registerAdapter(new HttpServerProtocol())
+        .registerAdapter(new TubeMQProtocol())
         .build();
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/TubeMQProtocol.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/TubeMQProtocol.java
new file mode 100644
index 000000000..7a042de33
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/TubeMQProtocol.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.iiot.protocol.stream;
+
+import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
+import org.apache.streampipes.extensions.api.connect.IFormat;
+import org.apache.streampipes.extensions.api.connect.IParser;
+import org.apache.streampipes.extensions.api.connect.IProtocol;
+import 
org.apache.streampipes.extensions.api.connect.exception.AdapterException;
+import org.apache.streampipes.extensions.api.connect.exception.ParseException;
+import org.apache.streampipes.extensions.management.connect.SendToPipeline;
+import 
org.apache.streampipes.extensions.management.connect.adapter.sdk.ParameterExtractor;
+import org.apache.streampipes.model.AdapterType;
+import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
+import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
+import org.apache.streampipes.sdk.helpers.AdapterSourceType;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.inlong.tubemq.client.common.PeerInfo;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
+import org.apache.inlong.tubemq.client.consumer.MessageListener;
+import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
+public class TubeMQProtocol extends BrokerProtocol {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TubeMQProtocol.class);
+
+  public static final String ID = 
"org.apache.streampipes.connect.iiot.protocol.stream.tubemq";
+
+  public static final String TOPIC_KEY = "tubemq-topic";
+  public static final String MASTER_HOST_AND_PORT_KEY = 
"tubemq-master-host-and-port";
+  public static final String CONSUMER_GROUP_KEY = "tubemq-consumer-group";
+
+  private String consumerGroup;
+
+  private MessageSessionFactory messageSessionFactory;
+  private PushMessageConsumer pushConsumer;
+
+  public TubeMQProtocol() {
+  }
+
+  private TubeMQProtocol(IParser parser, IFormat format, String 
masterHostAndPort, String topic, String consumerGroup) {
+    super(parser, format, masterHostAndPort, topic);
+    this.consumerGroup = consumerGroup;
+  }
+
+  @Override
+  public IProtocol getInstance(ProtocolDescription protocolDescription, 
IParser parser, IFormat format) {
+    final ParameterExtractor extractor = new 
ParameterExtractor(protocolDescription.getConfig());
+
+    final String masterHostAndPort = 
extractor.singleValue(MASTER_HOST_AND_PORT_KEY, String.class);
+    final String topic = extractor.singleValue(TOPIC_KEY, String.class);
+    final String consumerGroup = extractor.singleValue(CONSUMER_GROUP_KEY, 
String.class);
+
+    return new TubeMQProtocol(parser, format, masterHostAndPort, topic, 
consumerGroup);
+  }
+
+  @Override
+  public ProtocolDescription declareModel() {
+    return 
ProtocolDescriptionBuilder.create(ID).withAssets(Assets.DOCUMENTATION, 
Assets.ICON).withLocales(Locales.EN)
+        .category(AdapterType.Generic).sourceType(AdapterSourceType.STREAM)
+        
.requiredTextParameter(Labels.withId(MASTER_HOST_AND_PORT_KEY)).requiredTextParameter(Labels.withId(TOPIC_KEY))
+        .requiredTextParameter(Labels.withId(CONSUMER_GROUP_KEY)).build();
+  }
+
+  @Override
+  public void run(IAdapterPipeline adapterPipeline) throws AdapterException {
+    final SendToPipeline sendToPipeline = new SendToPipeline(format, 
adapterPipeline);
+
+    final ConsumerConfig consumerConfig = new ConsumerConfig(brokerUrl, 
consumerGroup);
+    
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+
+    try {
+      messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+      pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
+
+      pushConsumer.subscribe(topic, null, new MessageListener() {
+        @Override
+        public void receiveMessages(PeerInfo peerInfo, List<Message> messages) 
{
+          for (final Message message : messages) {
+            try {
+              parser.parse(IOUtils.toInputStream(new 
String(message.getData()), "UTF-8"), sendToPipeline);
+            } catch (ParseException e) {
+              LOGGER.error("Error while parsing: " + e.getMessage());
+              e.printStackTrace();
+            }
+          }
+        }
+
+        @Override
+        public Executor getExecutor() {
+          return null;
+        }
+
+        @Override
+        public void stop() {
+        }
+      });
+      pushConsumer.completeSubscribe();
+    } catch (TubeClientException e) {
+      shutdown(messageSessionFactory, pushConsumer);
+      throw new AdapterException("Failed to create TubeMQ adapter.", e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    shutdown(messageSessionFactory, pushConsumer);
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  protected List<byte[]> getNByteElements(int n) throws ParseException {
+    final List<byte[]> elements = new ArrayList<>();
+
+    final ConsumerConfig consumerConfig = new ConsumerConfig(brokerUrl, 
consumerGroup);
+    
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
+
+    MessageSessionFactory messageSessionFactory = null;
+    PushMessageConsumer pushConsumer = null;
+    try {
+      messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+      pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
+
+      final CountDownLatch countDownLatch = new CountDownLatch(n);
+      pushConsumer.subscribe(topic, null, new MessageListener() {
+        @Override
+        public void receiveMessages(PeerInfo peerInfo, List<Message> messages) 
{
+          for (final Message message : messages) {
+            if (countDownLatch.getCount() == 0) {
+              return;
+            }
+            elements.add(message.getData());
+            countDownLatch.countDown();
+          }
+        }
+
+        @Override
+        public Executor getExecutor() {
+          return null;
+        }
+
+        @Override
+        public void stop() {
+        }
+      });
+      pushConsumer.completeSubscribe();
+      countDownLatch.await();
+    } catch (TubeClientException | InterruptedException e) {
+      throw new ParseException("Failed to getNByteElements", e);
+    } finally {
+      shutdown(messageSessionFactory, pushConsumer);
+    }
+
+    return elements;
+  }
+
+  private static void shutdown(MessageSessionFactory messageSessionFactory, 
PushMessageConsumer pushConsumer) {
+    if (pushConsumer != null && !pushConsumer.isShutdown()) {
+      try {
+        pushConsumer.shutdown();
+      } catch (Throwable ex) {
+        LOGGER.error("Failed to stop pushConsumer when TubeClientException 
occurred.");
+      }
+    }
+
+    if (messageSessionFactory != null) {
+      try {
+        messageSessionFactory.shutdown();
+      } catch (TubeClientException ex) {
+        LOGGER.error("Failed to stop messageSessionFactory when 
TubeClientException occurred.");
+      }
+    }
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/documentation.md
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/documentation.md
new file mode 100644
index 000000000..dbc91d9c2
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/documentation.md
@@ -0,0 +1,48 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+## Apache TubeMQ (InLong)
+
+<p align="center"> 
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+Consumes messages from an Apache TubeMQ broker.
+
+***
+
+## Configuration
+
+### TubeMQ Master Information
+
+This field describes the endpoints of all the TubeMQ masters.
+
+The format should be like `ip1:port1,ip2:port2,ip3:port3`.
+
+### TubeMQ Topic
+
+The topic where events should be sent to.
+
+### TubeMQ Consumer Group
+
+The consumer group of the TubeMQ Consumer.
+
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/icon.png
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/icon.png
new file mode 100644
index 000000000..7c0fde0bc
Binary files /dev/null and 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/icon.png
 differ
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/strings.en
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/strings.en
new file mode 100644
index 000000000..d8eadb8d0
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.tubemq/strings.en
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+org.apache.streampipes.connect.iiot.protocol.stream.tubemq.title=Apache TubeMQ 
(InLong)
+org.apache.streampipes.connect.iiot.protocol.stream.tubemq.description=Consumes
 messages from an Apache TubeMQ (InLong) broker.
+
+tubemq-master-host-and-port.title=TubeMQ Master Information
+tubemq-master-host-and-port.description=This field describes the endpoints of 
all the TubeMQ masters. The format should be like 
"ip1:port1,ip2:port2,ip3:port3".
+
+tubemq-consumer-group.title=TubeMQ Consumer Group
+tubemq-consumer-group.description=The consumer group of the TubeMQ Consumer
+
+tubemq-topic.title=TubeMQ Topic
+tubemq-topic.description=Select a TubeMQ topic
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
index fb63ad3c0..cf3ce5d56 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
@@ -88,6 +88,10 @@
             <artifactId>jnats</artifactId>
             <version>${nats.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>tubemq-client</artifactId>
+        </dependency>
 
         <!-- dependency convergence -->
         <dependency>
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
index a6ccb66f4..e242e18e4 100644
--- 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
@@ -37,6 +37,7 @@ import 
org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink;
 import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqController;
 import org.apache.streampipes.sinks.brokers.jvm.rest.RestController;
 import org.apache.streampipes.sinks.brokers.jvm.rocketmq.RocketMQPublisherSink;
+import org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink;
 import org.apache.streampipes.sinks.brokers.jvm.websocket.WebsocketServerSink;
 
 public class BrokersJvmInit extends ExtensionsModelSubmitter {
@@ -61,6 +62,7 @@ public class BrokersJvmInit extends ExtensionsModelSubmitter {
             new WebsocketServerSink(),
             new PulsarPublisherSink(),
             new RocketMQPublisherSink(),
+            new TubeMQPublisherSink(),
             new NatsController())
         .registerMessagingFormats(
             new JsonDataFormatFactory(),
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQParameters.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQParameters.java
new file mode 100644
index 000000000..9a81f8b51
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQParameters.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.brokers.jvm.tubemq;
+
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+
+import static 
org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink.MASTER_HOST_AND_PORT_KEY;
+import static 
org.apache.streampipes.sinks.brokers.jvm.tubemq.TubeMQPublisherSink.TOPIC_KEY;
+
+public class TubeMQParameters {
+
+  private final String masterHostAndPort;
+  private final String topic;
+
+  public TubeMQParameters(SinkParams parameters) {
+    DataSinkParameterExtractor extractor = parameters.extractor();
+
+    this.masterHostAndPort = 
extractor.singleValueParameter(MASTER_HOST_AND_PORT_KEY, String.class);
+    this.topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
+  }
+
+  public String getMasterHostAndPort() {
+    return masterHostAndPort;
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQPublisherSink.java
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQPublisherSink.java
new file mode 100644
index 000000000..deeb2dcc3
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/tubemq/TubeMQPublisherSink.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.brokers.jvm.tubemq;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.apache.inlong.tubemq.client.producer.MessageSentResult;
+import org.apache.inlong.tubemq.corebase.Message;
+
+import java.util.Map;
+
+public class TubeMQPublisherSink extends StreamPipesDataSink {
+
+  public static final String MASTER_HOST_AND_PORT_KEY = 
"tubemq-master-host-and-port";
+  public static final String TOPIC_KEY = "tubemq-topic";
+
+  private SpDataFormatDefinition spDataFormatDefinition;
+  private String topic;
+
+  private MessageProducer messageProducer;
+
+  @Override
+  public DataSinkDescription declareModel() {
+    return 
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.tubemq").category(DataSinkType.MESSAGING)
+        .withLocales(Locales.EN).withAssets(Assets.DOCUMENTATION, Assets.ICON)
+        
.requiredStream(StreamRequirementsBuilder.create().requiredProperty(EpRequirements.anyProperty()).build())
+        
.requiredTextParameter(Labels.withId(MASTER_HOST_AND_PORT_KEY)).requiredTextParameter(Labels.withId(TOPIC_KEY))
+        .build();
+  }
+
+  @Override
+  public void onInvocation(SinkParams sinkParams, EventSinkRuntimeContext 
runtimeContext) throws SpRuntimeException {
+    final TubeMQParameters tubeMQParameters = new TubeMQParameters(sinkParams);
+
+    spDataFormatDefinition = new JsonDataFormatDefinition();
+    topic = tubeMQParameters.getTopic();
+
+    final TubeClientConfig tubeClientConfig = new 
TubeClientConfig(tubeMQParameters.getMasterHostAndPort());
+    try {
+      messageProducer = new 
TubeSingleSessionFactory(tubeClientConfig).createProducer();
+      messageProducer.publish(topic);
+    } catch (TubeClientException e) {
+      throw new SpRuntimeException(e);
+    }
+  }
+
+  @Override
+  public void onEvent(Event event) throws SpRuntimeException {
+    final Map<String, Object> eventRawMap = event.getRaw();
+    final byte[] eventMessage = spDataFormatDefinition.fromMap(eventRawMap);
+    final Message tubemqMessage = new Message(topic, eventMessage);
+
+    try {
+      final MessageSentResult result = 
messageProducer.sendMessage(tubemqMessage);
+      if (!result.isSuccess()) {
+        throw new SpRuntimeException(
+            String.format("Failed to send message: %s, because: %s", 
tubemqMessage, result.getErrMsg()));
+      }
+    } catch (TubeClientException | InterruptedException e) {
+      throw new SpRuntimeException(e);
+    }
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+    try {
+      messageProducer.shutdown();
+    } catch (Throwable e) {
+      throw new SpRuntimeException(e);
+    }
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/documentation.md
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/documentation.md
new file mode 100644
index 000000000..6971c693d
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/documentation.md
@@ -0,0 +1,55 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+## TubeMQ (InLong) Publisher
+
+<p align="center"> 
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+Publishes events to Apache TubeMQ (InLong).
+
+***
+
+## Required Inputs
+
+This sink does not have any requirements and works with any incoming event 
type.
+
+***
+
+## Configuration
+
+### TubeMQ Master Information
+
+This field describes the endpoints of all the TubeMQ masters. 
+
+The format should be like `ip1:port1,ip2:port2,ip3:port3`.
+
+
+### TubeMQ Topic
+
+The topic where events should be sent to.
+
+
+## Output
+
+(not applicable for data sinks)
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/icon.png
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/icon.png
new file mode 100644
index 000000000..7c0fde0bc
Binary files /dev/null and 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/icon.png
 differ
diff --git 
a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/strings.en
 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/strings.en
new file mode 100644
index 000000000..9ac5b877b
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.tubemq/strings.en
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.streampipes.sinks.brokers.jvm.tubemq.title=TubeMQ (InLong) Publisher
+org.apache.streampipes.sinks.brokers.jvm.tubemq.description=Publish events to 
Apache TubeMQ (InLong)
+
+tubemq-master-host-and-port.title=TubeMQ Master Information
+tubemq-master-host-and-port.description=This field describes the endpoints of 
all the TubeMQ masters. The format should be like 
"ip1:port1,ip2:port2,ip3:port3".
+
+tubemq-topic.title=TubeMQ Topic
+tubemq-topic.description=Select a TubeMQ topic


Reply via email to