This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c388f7c [INLONG-1893] Inlong-Sort-Standalone support to sort the
events to Pulsar clusters. (#1942)
c388f7c is described below
commit c388f7c805a7287456678e991069ca21ed0741d6
Author: 卢春亮 <[email protected]>
AuthorDate: Mon Dec 13 14:55:41 2021 +0800
[INLONG-1893] Inlong-Sort-Standalone support to sort the events to Pulsar
clusters. (#1942)
---
inlong-dataproxy/dataproxy-source/pom.xml | 94 +++++-------
.../pulsar/federation/PulsarProducerCluster.java | 19 ++-
inlong-dataproxy/pom.xml | 19 +++
inlong-sort-standalone/pom.xml | 12 ++
.../sink/pulsar/PulsarFederationSink.java | 102 +++++++++++++
.../sink/pulsar/PulsarFederationSinkContext.java | 122 +++++++++++++++
.../sink/pulsar/PulsarFederationWorker.java | 157 ++++++++++++++++++++
.../sink/pulsar}/PulsarProducerCluster.java | 39 ++---
.../sink/pulsar/PulsarProducerFederation.java | 165 +++++++++++++++++++++
9 files changed, 652 insertions(+), 77 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/pom.xml
b/inlong-dataproxy/dataproxy-source/pom.xml
index 5760efb..ba75eef 100644
--- a/inlong-dataproxy/dataproxy-source/pom.xml
+++ b/inlong-dataproxy/dataproxy-source/pom.xml
@@ -1,59 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor
- license agreements. See the NOTICE file distributed with this work for
additional
- information regarding copyright ownership. The ASF licenses this file
to
- you under the Apache License, Version 2.0 (the "License"); you may not
use
- this file except in compliance with the License. You may obtain a copy
of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required
- by applicable law or agreed to in writing, software distributed under
the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS
- OF ANY KIND, either express or implied. See the License for the
specific
- language governing permissions and limitations under the License. -->
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.apache.inlong</groupId>
- <artifactId>inlong-dataproxy</artifactId>
- <version>0.12.0-incubating-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <name>Apache InLong - DataProxy Source</name>
- <artifactId>dataproxy-source</artifactId>
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong-dataproxy</artifactId>
+ <version>0.12.0-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <name>Apache InLong - DataProxy Source</name>
+ <artifactId>dataproxy-source</artifactId>
- <properties>
-
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <compiler.source>1.8</compiler.source>
- <compiler.target>1.8</compiler.target>
- <guava.version>19.0</guava.version>
- <powermock.version>2.0.9</powermock.version>
- <skipTests>false</skipTests>
- </properties>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <compiler.source>1.8</compiler.source>
+ <compiler.target>1.8</compiler.target>
+ </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>tubemq-client</artifactId>
- <version>${project.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-junit4</artifactId>
- <version>${powermock.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-api-mockito2</artifactId>
- <version>${powermock.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>tubemq-client</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
index 69a8ee1..4c82144 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
@@ -17,6 +17,7 @@
package org.apache.inlong.dataproxy.sink.pulsar.federation;
+import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -204,8 +205,11 @@ public class PulsarProducerCluster implements
LifecycleAware {
if (producer == null) {
try {
LOG.info("try to new a object for topic " + topic);
+ SecureRandom secureRandom = new SecureRandom(
+ (workerName + "-" + cacheClusterName + "-" + topic +
System.currentTimeMillis()).getBytes());
+ String producerName = workerName + "-" + cacheClusterName +
"-" + topic + "-" + secureRandom.nextLong();
producer = baseBuilder.clone().topic(topic)
- .producerName(workerName + "-" + cacheClusterName +
"-" + topic)
+ .producerName(producerName)
.create();
LOG.info("create new producer success:{}",
producer.getProducerName());
Producer<byte[]> oldProducer =
this.producerMap.putIfAbsent(topic, producer);
@@ -224,14 +228,17 @@ public class PulsarProducerCluster implements
LifecycleAware {
this.addMetric(event, topic, false, 0);
return false;
}
+ // sendAsync
+ CompletableFuture<MessageId> future = null;
String messageKey = headers.get(Constants.MESSAGE_KEY);
+ long sendTime = System.currentTimeMillis();
if (messageKey == null) {
- messageKey = headers.get(Constants.HEADER_KEY_SOURCE_IP);
+ future = producer.newMessage().properties(headers)
+ .value(event.getBody()).sendAsync();
+ } else {
+ future = producer.newMessage().key(messageKey).properties(headers)
+ .value(event.getBody()).sendAsync();
}
- // sendAsync
- long sendTime = System.currentTimeMillis();
- CompletableFuture<MessageId> future =
producer.newMessage().key(messageKey).properties(headers)
- .value(event.getBody()).sendAsync();
// callback
future.whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
diff --git a/inlong-dataproxy/pom.xml b/inlong-dataproxy/pom.xml
index 791760f..80926ad 100644
--- a/inlong-dataproxy/pom.xml
+++ b/inlong-dataproxy/pom.xml
@@ -48,6 +48,8 @@
<inlong-common.version>1.3.4</inlong-common.version>
<pulsar.version>2.8.1</pulsar.version>
<testng.version>6.14.3</testng.version>
+ <guava.version>19.0</guava.version>
+ <powermock.version>2.0.9</powermock.version>
</properties>
<dependencies>
@@ -107,6 +109,23 @@
<artifactId>testng</artifactId>
<version>${testng.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/pom.xml b/inlong-sort-standalone/pom.xml
index 0672876..ecb04b1 100644
--- a/inlong-sort-standalone/pom.xml
+++ b/inlong-sort-standalone/pom.xml
@@ -50,6 +50,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compiler.source>1.8</compiler.source>
<compiler.target>1.8</compiler.target>
+ <pulsar.version>2.7.2</pulsar.version>
</properties>
<dependencies>
@@ -84,6 +85,17 @@
<version>${guava.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ <version>${pulsar.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>guava</artifactId>
+ <groupId>com.google.guava</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>${powermock.version}</version>
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
new file mode 100644
index 0000000..f069863
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.pulsar;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ *
+ * PulsarFederationSink
+ */
+public class PulsarFederationSink extends AbstractSink implements Configurable
{
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(PulsarFederationSink.class);
+
+ private PulsarFederationSinkContext context;
+ private List<PulsarFederationWorker> workers = new ArrayList<>();
+ private Map<String, String> dimensions;
+
+ /**
+ * start
+ */
+ @Override
+ public void start() {
+ String sinkName = this.getName();
+ // create worker
+ for (int i = 0; i < context.getMaxThreads(); i++) {
+ PulsarFederationWorker worker = new
PulsarFederationWorker(sinkName, i, context);
+ worker.start();
+ this.workers.add(worker);
+ }
+ super.start();
+ }
+
+ /**
+ * stop
+ */
+ @Override
+ public void stop() {
+ for (PulsarFederationWorker worker : workers) {
+ try {
+ worker.close();
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ this.context.close();
+ super.stop();
+ }
+
+ /**
+ * configure
+ *
+ * @param context
+ */
+ @Override
+ public void configure(Context context) {
+ LOG.info("start to configure:{}, context:{}.",
this.getClass().getSimpleName(), context.toString());
+ this.context = new PulsarFederationSinkContext(getName(), context,
getChannel());
+ this.context.start();
+ this.dimensions = new HashMap<>();
+ this.dimensions.put(SortMetricItem.KEY_CLUSTER_ID,
this.context.getClusterId());
+ this.dimensions.put(SortMetricItem.KEY_TASK_NAME,
this.context.getTaskName());
+ this.dimensions.put(SortMetricItem.KEY_SINK_ID,
this.context.getSinkName());
+ }
+
+ /**
+ * process
+ *
+ * @return Status
+ * @throws EventDeliveryException
+ */
+ @Override
+ public Status process() throws EventDeliveryException {
+ return Status.BACKOFF;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
new file mode 100644
index 0000000..9142c2c
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.pulsar;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.Constants;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ *
+ * PulsarFederationSinkContext
+ */
+public class PulsarFederationSinkContext extends SinkContext {
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(PulsarFederationSinkContext.class);
+
+ private Context producerContext;
+ private Map<String, String> idTopicMap = new ConcurrentHashMap<>();
+ private List<CacheClusterConfig> clusterConfigList = new ArrayList<>();
+
+ /**
+ * Constructor
+ *
+ * @param sinkName
+ * @param context
+ * @param channel
+ */
+ public PulsarFederationSinkContext(String sinkName, Context context,
Channel channel) {
+ super(sinkName, context, channel);
+ }
+
+ /**
+ * reload
+ */
+ public void reload() {
+ super.reload();
+ try {
+ SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
+ if (this.sortTaskConfig != null &&
this.sortTaskConfig.equals(newSortTaskConfig)) {
+ return;
+ }
+ this.sortTaskConfig = newSortTaskConfig;
+ this.producerContext = new
Context(this.sortTaskConfig.getSinkParams());
+ // parse the config of id and topic
+ Map<String, String> newIdTopicMap = new ConcurrentHashMap<>();
+ List<Map<String, String>> idList =
this.sortTaskConfig.getIdParams();
+ for (Map<String, String> idParam : idList) {
+ String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
+ String inlongStreamId =
idParam.get(Constants.INLONG_STREAM_ID);
+ String uid = InlongId.generateUid(inlongGroupId,
inlongStreamId);
+ String topic = idParam.getOrDefault(Constants.TOPIC, uid);
+ newIdTopicMap.put(uid, topic);
+ }
+ // build cache cluster config
+ CacheClusterConfig clusterConfig = new CacheClusterConfig();
+ clusterConfig.setClusterName(this.taskName);
+ clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+ List<CacheClusterConfig> newClusterConfigList = new ArrayList<>();
+ newClusterConfigList.add(clusterConfig);
+ // change current config
+ this.idTopicMap = newIdTopicMap;
+ this.clusterConfigList = newClusterConfigList;
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * get producerContext
+ *
+ * @return the producerContext
+ */
+ public Context getProducerContext() {
+ return producerContext;
+ }
+
+ /**
+ * getTopic
+ *
+ * @param uid
+ * @return
+ */
+ public String getTopic(String uid) {
+ return this.idTopicMap.get(uid);
+ }
+
+ /**
+ * getCacheClusters
+ *
+ * @return
+ */
+ public List<CacheClusterConfig> getCacheClusters() {
+ return this.clusterConfigList;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
new file mode 100644
index 0000000..bf2a836
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.pulsar;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.utils.Constants;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ *
+ * PulsarFederationWorker
+ */
+public class PulsarFederationWorker extends Thread {
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(PulsarFederationWorker.class);
+
+ private final String workerName;
+ private final PulsarFederationSinkContext context;
+
+ private PulsarProducerFederation producerFederation;
+ private LifecycleState status;
+ private Map<String, String> dimensions;
+
+ /**
+ * Constructor
+ *
+ * @param sinkName
+ * @param workerIndex
+ * @param context
+ */
+ public PulsarFederationWorker(String sinkName, int workerIndex,
PulsarFederationSinkContext context) {
+ super();
+ this.workerName = sinkName + "-worker-" + workerIndex;
+ this.context = context;
+ this.producerFederation = new PulsarProducerFederation(workerName,
this.context);
+ this.status = LifecycleState.IDLE;
+ this.dimensions = new HashMap<>();
+ this.dimensions.put(SortMetricItem.KEY_CLUSTER_ID,
this.context.getClusterId());
+ this.dimensions.put(SortMetricItem.KEY_TASK_NAME,
this.context.getTaskName());
+ this.dimensions.put(SortMetricItem.KEY_SINK_ID,
this.context.getSinkName());
+ }
+
+ /**
+ * start
+ */
+ @Override
+ public void start() {
+ this.producerFederation.start();
+ this.status = LifecycleState.START;
+ super.start();
+ }
+
+ /**
+ *
+ * close
+ */
+ public void close() {
+ // close all producers
+ this.producerFederation.close();
+ this.status = LifecycleState.STOP;
+ }
+
+ /**
+ * run
+ */
+ @Override
+ public void run() {
+ LOG.info(String.format("start PulsarSetWorker:%s", this.workerName));
+ while (status != LifecycleState.STOP) {
+ Channel channel = context.getChannel();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ try {
+ Event event = channel.take();
+ if (event == null) {
+ tx.commit();
+ sleepOneInterval();
+ continue;
+ }
+ // fill topic
+ this.fillTopic(event);
+ // metric
+ SortMetricItem.fillInlongId(event, dimensions);
+ this.dimensions.put(SortMetricItem.KEY_SINK_DATA_ID,
event.getHeaders().get(Constants.TOPIC));
+ SortMetricItem metricItem =
this.context.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.sendCount.incrementAndGet();
+ metricItem.sendSize.addAndGet(event.getBody().length);
+ // send
+ this.producerFederation.send(event, tx);
+ } catch (Throwable t) {
+ LOG.error("Process event failed!" + this.getName(), t);
+ try {
+ tx.rollback();
+ tx.close();
+ // metric
+ SortMetricItem metricItem =
this.context.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.readFailCount.incrementAndGet();
+ sleepOneInterval();
+ } catch (Throwable e) {
+ LOG.error("Channel take transaction rollback exception:" +
getName(), e);
+ }
+ }
+ }
+ }
+
+ /**
+ * fillTopic
+ *
+ * @param currentRecord
+ */
+ private void fillTopic(Event currentRecord) {
+ Map<String, String> headers = currentRecord.getHeaders();
+ String inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
+ String inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
+ String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+ String topic = this.context.getTopic(uid);
+ if (!StringUtils.isBlank(topic)) {
+ headers.put(Constants.TOPIC, topic);
+ }
+ }
+
+ /**
+ * sleepOneInterval
+ */
+ private void sleepOneInterval() {
+ try {
+ Thread.sleep(context.getProcessInterval());
+ } catch (InterruptedException e1) {
+ LOG.error(e1.getMessage(), e1);
+ }
+ }
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
similarity index 90%
copy from
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
copy to
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index 69a8ee1..1e7a5e6 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.dataproxy.sink.pulsar.federation;
+package org.apache.inlong.sort.standalone.sink.pulsar;
import java.util.HashMap;
import java.util.Map;
@@ -26,11 +26,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.flume.Context;
import org.apache.flume.Event;
+import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
-import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.utils.Constants;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
@@ -42,7 +44,6 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
*
@@ -50,7 +51,7 @@ import org.slf4j.LoggerFactory;
*/
public class PulsarProducerCluster implements LifecycleAware {
- public static final Logger LOG =
LoggerFactory.getLogger(PulsarProducerCluster.class);
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(PulsarProducerCluster.class);
public static final String KEY_SERVICE_URL = "serviceUrl";
public static final String KEY_AUTHENTICATION = "authentication";
@@ -113,8 +114,6 @@ public class PulsarProducerCluster implements
LifecycleAware {
.authentication(AuthenticationFactory.token(authentication))
.build();
this.baseBuilder = client.newProducer();
-// Map<String, Object> builderConf = new HashMap<>();
-// builderConf.putAll(context.getParameters());
this.baseBuilder
.hashingScheme(HashingScheme.Murmur3_32Hash)
.enableBatching(context.getBoolean(KEY_ENABLEBATCHING,
true))
@@ -165,7 +164,7 @@ public class PulsarProducerCluster implements
LifecycleAware {
@Override
public void stop() {
this.state = LifecycleState.STOP;
- //
+ // close producer
for (Entry<String, Producer<byte[]>> entry :
this.producerMap.entrySet()) {
try {
entry.getValue().close();
@@ -194,8 +193,9 @@ public class PulsarProducerCluster implements
LifecycleAware {
* send
*
* @param event
+ * @param tx
*/
- public boolean send(Event event) {
+ public boolean send(Event event, Transaction tx) {
// send
Map<String, String> headers = event.getHeaders();
String topic = headers.get(Constants.TOPIC);
@@ -220,7 +220,8 @@ public class PulsarProducerCluster implements
LifecycleAware {
}
// create producer failed
if (producer == null) {
- sinkContext.getBufferQueue().release(event.getBody().length);
+ tx.rollback();
+ tx.close();
this.addMetric(event, topic, false, 0);
return false;
}
@@ -237,10 +238,12 @@ public class PulsarProducerCluster implements
LifecycleAware {
if (ex != null) {
LOG.error("Send fail:{}", ex.getMessage());
LOG.error(ex.getMessage(), ex);
- sinkContext.getBufferQueue().offer(event);
+ tx.rollback();
+ tx.close();
this.addMetric(event, topic, false, 0);
} else {
- sinkContext.getBufferQueue().release(event.getBody().length);
+ tx.commit();
+ tx.close();
this.addMetric(event, topic, true, sendTime);
}
});
@@ -257,12 +260,12 @@ public class PulsarProducerCluster implements
LifecycleAware {
*/
private void addMetric(Event currentRecord, String topic, boolean result,
long sendTime) {
Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID,
this.sinkContext.getProxyClusterId());
+ dimensions.put(SortMetricItem.KEY_CLUSTER_ID,
this.sinkContext.getClusterId());
// metric
- DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
- dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.cacheClusterName);
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
- DataProxyMetricItem metricItem =
this.sinkContext.getMetricItemSet().findMetricItem(dimensions);
+ SortMetricItem.fillInlongId(currentRecord, dimensions);
+ dimensions.put(SortMetricItem.KEY_SINK_ID, this.cacheClusterName);
+ dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, topic);
+ SortMetricItem metricItem =
this.sinkContext.getMetricItemSet().findMetricItem(dimensions);
if (result) {
metricItem.sendSuccessCount.incrementAndGet();
metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
new file mode 100644
index 0000000..c914fc8
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.pulsar;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ *
+ * PulsarProducerFederation
+ */
+public class PulsarProducerFederation {
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(PulsarProducerFederation.class);
+
+ private final String workerName;
+ private final PulsarFederationSinkContext context;
+ private Timer reloadTimer;
+
+ private List<PulsarProducerCluster> clusterList = new ArrayList<>();
+ private List<PulsarProducerCluster> deletingClusterList = new
ArrayList<>();
+
+ private AtomicInteger clusterIndex = new AtomicInteger(0);
+
+ /**
+ * Constructor
+ *
+ * @param workerName
+ * @param context
+ */
+ public PulsarProducerFederation(String workerName,
PulsarFederationSinkContext context) {
+ this.workerName = workerName;
+ this.context = context;
+ }
+
+ /**
+ * start
+ */
+ public void start() {
+ try {
+ this.reload();
+ this.setReloadTimer();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * close
+ */
+ public void close() {
+ try {
+ this.reloadTimer.cancel();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ for (PulsarProducerCluster cluster : this.clusterList) {
+ cluster.stop();
+ }
+ }
+
+ /**
+ * setReloadTimer
+ */
+ private void setReloadTimer() {
+ reloadTimer = new Timer(true);
+ TimerTask task = new TimerTask() {
+
+ public void run() {
+ reload();
+ }
+ };
+ reloadTimer.schedule(task, new Date(System.currentTimeMillis() +
context.getReloadInterval()),
+ context.getReloadInterval());
+ }
+
+ /**
+ * reload
+ */
+ public void reload() {
+ try {
+ // stop deleted cluster
+ deletingClusterList.forEach(item -> {
+ item.stop();
+ });
+ deletingClusterList.clear();
+ // update cluster list
+ List<CacheClusterConfig> configList =
this.context.getCacheClusters();
+ List<PulsarProducerCluster> newClusterList = new
ArrayList<>(configList.size());
+ // prepare
+ Set<String> newClusterNames = new HashSet<>();
+ configList.forEach(item -> {
+ newClusterNames.add(item.getClusterName());
+ });
+ Set<String> oldClusterNames = new HashSet<>();
+ clusterList.forEach(item -> {
+ oldClusterNames.add(item.getCacheClusterName());
+ });
+ // add
+ for (CacheClusterConfig config : configList) {
+ if (!oldClusterNames.contains(config.getClusterName())) {
+ PulsarProducerCluster cluster = new
PulsarProducerCluster(workerName, config, context);
+ cluster.start();
+ newClusterList.add(cluster);
+ }
+ }
+ // remove
+ for (PulsarProducerCluster cluster : this.clusterList) {
+ if (newClusterNames.contains(cluster.getCacheClusterName())) {
+ newClusterList.add(cluster);
+ } else {
+ deletingClusterList.add(cluster);
+ }
+ }
+ this.clusterList = newClusterList;
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * send
+ *
+ * @param event
+ * @param tx
+ */
+ public boolean send(Event event, Transaction tx) {
+ int currentIndex = clusterIndex.getAndIncrement();
+ if (currentIndex > Integer.MAX_VALUE / 2) {
+ clusterIndex.set(0);
+ }
+ List<PulsarProducerCluster> currentClusterList = this.clusterList;
+ int currentSize = currentClusterList.size();
+ int realIndex = currentIndex % currentSize;
+ PulsarProducerCluster clusterProducer =
currentClusterList.get(realIndex);
+ return clusterProducer.send(event, tx);
+ }
+}