This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c388f7c  [INLONG-1893] Inlong-Sort-Standalone support to sort the 
events to Pulsar clusters. (#1942)
c388f7c is described below

commit c388f7c805a7287456678e991069ca21ed0741d6
Author: 卢春亮 <[email protected]>
AuthorDate: Mon Dec 13 14:55:41 2021 +0800

    [INLONG-1893] Inlong-Sort-Standalone support to sort the events to Pulsar 
clusters. (#1942)
---
 inlong-dataproxy/dataproxy-source/pom.xml          |  94 +++++-------
 .../pulsar/federation/PulsarProducerCluster.java   |  19 ++-
 inlong-dataproxy/pom.xml                           |  19 +++
 inlong-sort-standalone/pom.xml                     |  12 ++
 .../sink/pulsar/PulsarFederationSink.java          | 102 +++++++++++++
 .../sink/pulsar/PulsarFederationSinkContext.java   | 122 +++++++++++++++
 .../sink/pulsar/PulsarFederationWorker.java        | 157 ++++++++++++++++++++
 .../sink/pulsar}/PulsarProducerCluster.java        |  39 ++---
 .../sink/pulsar/PulsarProducerFederation.java      | 165 +++++++++++++++++++++
 9 files changed, 652 insertions(+), 77 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/pom.xml 
b/inlong-dataproxy/dataproxy-source/pom.xml
index 5760efb..ba75eef 100644
--- a/inlong-dataproxy/dataproxy-source/pom.xml
+++ b/inlong-dataproxy/dataproxy-source/pom.xml
@@ -1,59 +1,47 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
-       license agreements. See the NOTICE file distributed with this work for 
additional 
-       information regarding copyright ownership. The ASF licenses this file 
to 
-       you under the Apache License, Version 2.0 (the "License"); you may not 
use 
-       this file except in compliance with the License. You may obtain a copy 
of 
-       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required 
-       by applicable law or agreed to in writing, software distributed under 
the 
-       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
-       OF ANY KIND, either express or implied. See the License for the 
specific 
-       language governing permissions and limitations under the License. -->
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0";
-       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-       <parent>
-               <groupId>org.apache.inlong</groupId>
-               <artifactId>inlong-dataproxy</artifactId>
-               <version>0.12.0-incubating-SNAPSHOT</version>
-       </parent>
-       <modelVersion>4.0.0</modelVersion>
-       <name>Apache InLong - DataProxy Source</name>
-       <artifactId>dataproxy-source</artifactId>
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>inlong-dataproxy</artifactId>
+        <version>0.12.0-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <name>Apache InLong - DataProxy Source</name>
+    <artifactId>dataproxy-source</artifactId>
 
-       <properties>
-               
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-               <compiler.source>1.8</compiler.source>
-               <compiler.target>1.8</compiler.target>
-               <guava.version>19.0</guava.version>
-               <powermock.version>2.0.9</powermock.version>
-               <skipTests>false</skipTests>
-       </properties>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <compiler.source>1.8</compiler.source>
+        <compiler.target>1.8</compiler.target>
+    </properties>
 
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.inlong</groupId>
-                       <artifactId>tubemq-client</artifactId>
-                       <version>${project.version}</version>
-                       <scope>compile</scope>
-               </dependency>
-               <dependency>
-                       <groupId>com.google.guava</groupId>
-                       <artifactId>guava</artifactId>
-                       <version>${guava.version}</version>
-               </dependency>
-               <dependency>
-                       <groupId>org.powermock</groupId>
-                       <artifactId>powermock-module-junit4</artifactId>
-                       <version>${powermock.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.powermock</groupId>
-                       <artifactId>powermock-api-mockito2</artifactId>
-                       <version>${powermock.version}</version>
-                       <scope>test</scope>
-               </dependency>
-       </dependencies>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>tubemq-client</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
index 69a8ee1..4c82144 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.dataproxy.sink.pulsar.federation;
 
+import java.security.SecureRandom;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -204,8 +205,11 @@ public class PulsarProducerCluster implements 
LifecycleAware {
         if (producer == null) {
             try {
                 LOG.info("try to new a object for topic " + topic);
+                SecureRandom secureRandom = new SecureRandom(
+                        (workerName + "-" + cacheClusterName + "-" + topic + 
System.currentTimeMillis()).getBytes());
+                String producerName = workerName + "-" + cacheClusterName + 
"-" + topic + "-" + secureRandom.nextLong();
                 producer = baseBuilder.clone().topic(topic)
-                        .producerName(workerName + "-" + cacheClusterName + 
"-" + topic)
+                        .producerName(producerName)
                         .create();
                 LOG.info("create new producer success:{}", 
producer.getProducerName());
                 Producer<byte[]> oldProducer = 
this.producerMap.putIfAbsent(topic, producer);
@@ -224,14 +228,17 @@ public class PulsarProducerCluster implements 
LifecycleAware {
             this.addMetric(event, topic, false, 0);
             return false;
         }
+        // sendAsync
+        CompletableFuture<MessageId> future = null;
         String messageKey = headers.get(Constants.MESSAGE_KEY);
+        long sendTime = System.currentTimeMillis();
         if (messageKey == null) {
-            messageKey = headers.get(Constants.HEADER_KEY_SOURCE_IP);
+            future = producer.newMessage().properties(headers)
+                    .value(event.getBody()).sendAsync();
+        } else {
+            future = producer.newMessage().key(messageKey).properties(headers)
+                    .value(event.getBody()).sendAsync();
         }
-        // sendAsync
-        long sendTime = System.currentTimeMillis();
-        CompletableFuture<MessageId> future = 
producer.newMessage().key(messageKey).properties(headers)
-                .value(event.getBody()).sendAsync();
         // callback
         future.whenCompleteAsync((msgId, ex) -> {
             if (ex != null) {
diff --git a/inlong-dataproxy/pom.xml b/inlong-dataproxy/pom.xml
index 791760f..80926ad 100644
--- a/inlong-dataproxy/pom.xml
+++ b/inlong-dataproxy/pom.xml
@@ -48,6 +48,8 @@
         <inlong-common.version>1.3.4</inlong-common.version>
         <pulsar.version>2.8.1</pulsar.version>
         <testng.version>6.14.3</testng.version>
+        <guava.version>19.0</guava.version>
+        <powermock.version>2.0.9</powermock.version>
     </properties>
 
     <dependencies>
@@ -107,6 +109,23 @@
             <artifactId>testng</artifactId>
             <version>${testng.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/pom.xml b/inlong-sort-standalone/pom.xml
index 0672876..ecb04b1 100644
--- a/inlong-sort-standalone/pom.xml
+++ b/inlong-sort-standalone/pom.xml
@@ -50,6 +50,7 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <compiler.source>1.8</compiler.source>
         <compiler.target>1.8</compiler.target>
+        <pulsar.version>2.7.2</pulsar.version>
     </properties>
 
     <dependencies>
@@ -84,6 +85,17 @@
             <version>${guava.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>guava</artifactId>
+                    <groupId>com.google.guava</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.powermock</groupId>
             <artifactId>powermock-module-junit4</artifactId>
             <version>${powermock.version}</version>
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
new file mode 100644
index 0000000..f069863
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.pulsar;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ * 
+ * PulsarFederationSink
+ */
+public class PulsarFederationSink extends AbstractSink implements Configurable 
{
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(PulsarFederationSink.class);
+
+    private PulsarFederationSinkContext context;
+    private List<PulsarFederationWorker> workers = new ArrayList<>();
+    private Map<String, String> dimensions;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        String sinkName = this.getName();
+        // create worker
+        for (int i = 0; i < context.getMaxThreads(); i++) {
+            PulsarFederationWorker worker = new 
PulsarFederationWorker(sinkName, i, context);
+            worker.start();
+            this.workers.add(worker);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        for (PulsarFederationWorker worker : workers) {
+            try {
+                worker.close();
+            } catch (Throwable e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+        this.context.close();
+        super.stop();
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", 
this.getClass().getSimpleName(), context.toString());
+        this.context = new PulsarFederationSinkContext(getName(), context, 
getChannel());
+        this.context.start();
+        this.dimensions = new HashMap<>();
+        this.dimensions.put(SortMetricItem.KEY_CLUSTER_ID, 
this.context.getClusterId());
+        this.dimensions.put(SortMetricItem.KEY_TASK_NAME, 
this.context.getTaskName());
+        this.dimensions.put(SortMetricItem.KEY_SINK_ID, 
this.context.getSinkName());
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        return Status.BACKOFF;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
new file mode 100644
index 0000000..9142c2c
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.pulsar;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.Constants;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ * 
+ * PulsarFederationSinkContext
+ */
+public class PulsarFederationSinkContext extends SinkContext {
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(PulsarFederationSinkContext.class);
+
+    private Context producerContext;
+    private Map<String, String> idTopicMap = new ConcurrentHashMap<>();
+    private List<CacheClusterConfig> clusterConfigList = new ArrayList<>();
+
+    /**
+     * Constructor
+     * 
+     * @param sinkName
+     * @param context
+     * @param channel
+     */
+    public PulsarFederationSinkContext(String sinkName, Context context, 
Channel channel) {
+        super(sinkName, context, channel);
+    }
+
+    /**
+     * reload
+     */
+    public void reload() {
+        super.reload();
+        try {
+            SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
+            if (this.sortTaskConfig != null && 
this.sortTaskConfig.equals(newSortTaskConfig)) {
+                return;
+            }
+            this.sortTaskConfig = newSortTaskConfig;
+            this.producerContext = new 
Context(this.sortTaskConfig.getSinkParams());
+            // parse the config of id and topic
+            Map<String, String> newIdTopicMap = new ConcurrentHashMap<>();
+            List<Map<String, String>> idList = 
this.sortTaskConfig.getIdParams();
+            for (Map<String, String> idParam : idList) {
+                String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
+                String inlongStreamId = 
idParam.get(Constants.INLONG_STREAM_ID);
+                String uid = InlongId.generateUid(inlongGroupId, 
inlongStreamId);
+                String topic = idParam.getOrDefault(Constants.TOPIC, uid);
+                newIdTopicMap.put(uid, topic);
+            }
+            // build cache cluster config
+            CacheClusterConfig clusterConfig = new CacheClusterConfig();
+            clusterConfig.setClusterName(this.taskName);
+            clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+            List<CacheClusterConfig> newClusterConfigList = new ArrayList<>();
+            newClusterConfigList.add(clusterConfig);
+            // change current config
+            this.idTopicMap = newIdTopicMap;
+            this.clusterConfigList = newClusterConfigList;
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * get producerContext
+     * 
+     * @return the producerContext
+     */
+    public Context getProducerContext() {
+        return producerContext;
+    }
+
+    /**
+     * getTopic
+     * 
+     * @param  uid
+     * @return
+     */
+    public String getTopic(String uid) {
+        return this.idTopicMap.get(uid);
+    }
+
+    /**
+     * getCacheClusters
+     * 
+     * @return
+     */
+    public List<CacheClusterConfig> getCacheClusters() {
+        return this.clusterConfigList;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
new file mode 100644
index 0000000..bf2a836
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.pulsar;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.utils.Constants;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * 
+ * PulsarFederationWorker
+ */
+public class PulsarFederationWorker extends Thread {
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(PulsarFederationWorker.class);
+
+    private final String workerName;
+    private final PulsarFederationSinkContext context;
+
+    private PulsarProducerFederation producerFederation;
+    private LifecycleState status;
+    private Map<String, String> dimensions;
+
+    /**
+     * Constructor
+     * 
+     * @param sinkName
+     * @param workerIndex
+     * @param context
+     */
+    public PulsarFederationWorker(String sinkName, int workerIndex, 
PulsarFederationSinkContext context) {
+        super();
+        this.workerName = sinkName + "-worker-" + workerIndex;
+        this.context = context;
+        this.producerFederation = new PulsarProducerFederation(workerName, 
this.context);
+        this.status = LifecycleState.IDLE;
+        this.dimensions = new HashMap<>();
+        this.dimensions.put(SortMetricItem.KEY_CLUSTER_ID, 
this.context.getClusterId());
+        this.dimensions.put(SortMetricItem.KEY_TASK_NAME, 
this.context.getTaskName());
+        this.dimensions.put(SortMetricItem.KEY_SINK_ID, 
this.context.getSinkName());
+    }
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        this.producerFederation.start();
+        this.status = LifecycleState.START;
+        super.start();
+    }
+
+    /**
+     * 
+     * close
+     */
+    public void close() {
+        // close all producers
+        this.producerFederation.close();
+        this.status = LifecycleState.STOP;
+    }
+
+    /**
+     * run
+     */
+    @Override
+    public void run() {
+        LOG.info(String.format("start PulsarSetWorker:%s", this.workerName));
+        while (status != LifecycleState.STOP) {
+            Channel channel = context.getChannel();
+            Transaction tx = channel.getTransaction();
+            tx.begin();
+            try {
+                Event event = channel.take();
+                if (event == null) {
+                    tx.commit();
+                    sleepOneInterval();
+                    continue;
+                }
+                // fill topic
+                this.fillTopic(event);
+                // metric
+                SortMetricItem.fillInlongId(event, dimensions);
+                this.dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, 
event.getHeaders().get(Constants.TOPIC));
+                SortMetricItem metricItem = 
this.context.getMetricItemSet().findMetricItem(dimensions);
+                metricItem.sendCount.incrementAndGet();
+                metricItem.sendSize.addAndGet(event.getBody().length);
+                // send
+                this.producerFederation.send(event, tx);
+            } catch (Throwable t) {
+                LOG.error("Process event failed!" + this.getName(), t);
+                try {
+                    tx.rollback();
+                    tx.close();
+                    // metric
+                    SortMetricItem metricItem = 
this.context.getMetricItemSet().findMetricItem(dimensions);
+                    metricItem.readFailCount.incrementAndGet();
+                    sleepOneInterval();
+                } catch (Throwable e) {
+                    LOG.error("Channel take transaction rollback exception:" + 
getName(), e);
+                }
+            }
+        }
+    }
+
+    /**
+     * fillTopic
+     * 
+     * @param currentRecord
+     */
+    private void fillTopic(Event currentRecord) {
+        Map<String, String> headers = currentRecord.getHeaders();
+        String inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
+        String inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
+        String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+        String topic = this.context.getTopic(uid);
+        if (!StringUtils.isBlank(topic)) {
+            headers.put(Constants.TOPIC, topic);
+        }
+    }
+
+    /**
+     * sleepOneInterval
+     */
+    private void sleepOneInterval() {
+        try {
+            Thread.sleep(context.getProcessInterval());
+        } catch (InterruptedException e1) {
+            LOG.error(e1.getMessage(), e1);
+        }
+    }
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
similarity index 90%
copy from 
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
copy to 
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index 69a8ee1..1e7a5e6 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
+package org.apache.inlong.sort.standalone.sink.pulsar;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -26,11 +26,13 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.Transaction;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
-import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.utils.Constants;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
@@ -42,7 +44,6 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * 
@@ -50,7 +51,7 @@ import org.slf4j.LoggerFactory;
  */
 public class PulsarProducerCluster implements LifecycleAware {
 
-    public static final Logger LOG = 
LoggerFactory.getLogger(PulsarProducerCluster.class);
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(PulsarProducerCluster.class);
 
     public static final String KEY_SERVICE_URL = "serviceUrl";
     public static final String KEY_AUTHENTICATION = "authentication";
@@ -113,8 +114,6 @@ public class PulsarProducerCluster implements 
LifecycleAware {
                     
.authentication(AuthenticationFactory.token(authentication))
                     .build();
             this.baseBuilder = client.newProducer();
-//            Map<String, Object> builderConf = new HashMap<>();
-//            builderConf.putAll(context.getParameters());
             this.baseBuilder
                     .hashingScheme(HashingScheme.Murmur3_32Hash)
                     .enableBatching(context.getBoolean(KEY_ENABLEBATCHING, 
true))
@@ -165,7 +164,7 @@ public class PulsarProducerCluster implements 
LifecycleAware {
     @Override
     public void stop() {
         this.state = LifecycleState.STOP;
-        //
+        // close producer
         for (Entry<String, Producer<byte[]>> entry : 
this.producerMap.entrySet()) {
             try {
                 entry.getValue().close();
@@ -194,8 +193,9 @@ public class PulsarProducerCluster implements 
LifecycleAware {
      * send
      * 
      * @param event
+     * @param tx
      */
-    public boolean send(Event event) {
+    public boolean send(Event event, Transaction tx) {
         // send
         Map<String, String> headers = event.getHeaders();
         String topic = headers.get(Constants.TOPIC);
@@ -220,7 +220,8 @@ public class PulsarProducerCluster implements 
LifecycleAware {
         }
         // create producer failed
         if (producer == null) {
-            sinkContext.getBufferQueue().release(event.getBody().length);
+            tx.rollback();
+            tx.close();
             this.addMetric(event, topic, false, 0);
             return false;
         }
@@ -237,10 +238,12 @@ public class PulsarProducerCluster implements 
LifecycleAware {
             if (ex != null) {
                 LOG.error("Send fail:{}", ex.getMessage());
                 LOG.error(ex.getMessage(), ex);
-                sinkContext.getBufferQueue().offer(event);
+                tx.rollback();
+                tx.close();
                 this.addMetric(event, topic, false, 0);
             } else {
-                sinkContext.getBufferQueue().release(event.getBody().length);
+                tx.commit();
+                tx.close();
                 this.addMetric(event, topic, true, sendTime);
             }
         });
@@ -257,12 +260,12 @@ public class PulsarProducerCluster implements 
LifecycleAware {
      */
     private void addMetric(Event currentRecord, String topic, boolean result, 
long sendTime) {
         Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, 
this.sinkContext.getProxyClusterId());
+        dimensions.put(SortMetricItem.KEY_CLUSTER_ID, 
this.sinkContext.getClusterId());
         // metric
-        DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
-        dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.cacheClusterName);
-        dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
-        DataProxyMetricItem metricItem = 
this.sinkContext.getMetricItemSet().findMetricItem(dimensions);
+        SortMetricItem.fillInlongId(currentRecord, dimensions);
+        dimensions.put(SortMetricItem.KEY_SINK_ID, this.cacheClusterName);
+        dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, topic);
+        SortMetricItem metricItem = 
this.sinkContext.getMetricItemSet().findMetricItem(dimensions);
         if (result) {
             metricItem.sendSuccessCount.incrementAndGet();
             
metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
new file mode 100644
index 0000000..c914fc8
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.pulsar;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ * 
+ * PulsarProducerFederation
+ */
+public class PulsarProducerFederation {
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(PulsarProducerFederation.class);
+
+    private final String workerName;
+    private final PulsarFederationSinkContext context;
+    private Timer reloadTimer;
+
+    private List<PulsarProducerCluster> clusterList = new ArrayList<>();
+    private List<PulsarProducerCluster> deletingClusterList = new 
ArrayList<>();
+
+    private AtomicInteger clusterIndex = new AtomicInteger(0);
+
+    /**
+     * Constructor
+     * 
+     * @param workerName
+     * @param context
+     */
+    public PulsarProducerFederation(String workerName, 
PulsarFederationSinkContext context) {
+        this.workerName = workerName;
+        this.context = context;
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * close
+     */
+    public void close() {
+        try {
+            this.reloadTimer.cancel();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        for (PulsarProducerCluster cluster : this.clusterList) {
+            cluster.stop();
+        }
+    }
+
+    /**
+     * setReloadTimer
+     */
+    private void setReloadTimer() {
+        reloadTimer = new Timer(true);
+        TimerTask task = new TimerTask() {
+
+            public void run() {
+                reload();
+            }
+        };
+        reloadTimer.schedule(task, new Date(System.currentTimeMillis() + 
context.getReloadInterval()),
+                context.getReloadInterval());
+    }
+
+    /**
+     * reload
+     */
+    public void reload() {
+        try {
+            // stop deleted cluster
+            deletingClusterList.forEach(item -> {
+                item.stop();
+            });
+            deletingClusterList.clear();
+            // update cluster list
+            List<CacheClusterConfig> configList = 
this.context.getCacheClusters();
+            List<PulsarProducerCluster> newClusterList = new 
ArrayList<>(configList.size());
+            // prepare
+            Set<String> newClusterNames = new HashSet<>();
+            configList.forEach(item -> {
+                newClusterNames.add(item.getClusterName());
+            });
+            Set<String> oldClusterNames = new HashSet<>();
+            clusterList.forEach(item -> {
+                oldClusterNames.add(item.getCacheClusterName());
+            });
+            // add
+            for (CacheClusterConfig config : configList) {
+                if (!oldClusterNames.contains(config.getClusterName())) {
+                    PulsarProducerCluster cluster = new 
PulsarProducerCluster(workerName, config, context);
+                    cluster.start();
+                    newClusterList.add(cluster);
+                }
+            }
+            // remove
+            for (PulsarProducerCluster cluster : this.clusterList) {
+                if (newClusterNames.contains(cluster.getCacheClusterName())) {
+                    newClusterList.add(cluster);
+                } else {
+                    deletingClusterList.add(cluster);
+                }
+            }
+            this.clusterList = newClusterList;
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * send
+     * 
+     * @param event
+     * @param tx
+     */
+    public boolean send(Event event, Transaction tx) {
+        int currentIndex = clusterIndex.getAndIncrement();
+        if (currentIndex > Integer.MAX_VALUE / 2) {
+            clusterIndex.set(0);
+        }
+        List<PulsarProducerCluster> currentClusterList = this.clusterList;
+        int currentSize = currentClusterList.size();
+        int realIndex = currentIndex % currentSize;
+        PulsarProducerCluster clusterProducer = 
currentClusterList.get(realIndex);
+        return clusterProducer.send(event, tx);
+    }
+}

Reply via email to