This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 28759a9816 [INLONG-11958][Sort] Allow SortCkafka to filter out data in
TransformFunction (#11959)
28759a9816 is described below
commit 28759a9816e03a97bce8981ea608e040f6f7943f
Author: ChunLiang Lu <[email protected]>
AuthorDate: Fri Aug 8 16:41:28 2025 +0800
[INLONG-11958][Sort] Allow SortCkafka to filter out data in
TransformFunction (#11959)
* [INLONG-11958][Sort] Allow SortCkafka to filter out data in
TransformFunction
* [INLONG-11958][Sort] Allow SortCkafka to filter out data in
TransformFunction
---
.../pojo/sort/dataflow/sink/KafkaSinkConfig.java | 11 ++
.../kafka/DefaultEvent2KafkaRecordHandler.java | 58 +++++-
.../sink/kafka/IEvent2KafkaRecordHandler.java | 4 +-
.../sink/kafka/KafkaFederationSinkContext.java | 207 +++++++++++++++------
.../sink/kafka/KafkaFederationWorker.java | 71 ++++---
.../sort/standalone/sink/kafka/KafkaIdConfig.java | 3 +
.../sink/kafka/KafkaProducerCluster.java | 95 +++++-----
.../sink/kafka/KafkaProducerFederation.java | 13 +-
.../standalone/sink/kafka/KafkaTransaction.java | 74 ++++++++
9 files changed, 391 insertions(+), 145 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
index f0ec2bd6b1..78abaea839 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
@@ -24,5 +24,16 @@ import lombok.EqualsAndHashCode;
@Data
public class KafkaSinkConfig extends SinkConfig {
+ public static final String MESSAGE_TYPE_CSV = "csv";
+ public static final Character CSV_DEFAULT_DELIMITER = '|';
+ public static final String MESSAGE_TYPE_KV = "kv";
+ public static final Character KV_DEFAULT_ENTRYSPLITTER = '&';
+ public static final Character KV_DEFAULT_KVSPLITTER = '=';
+ public static final String MESSAGE_TYPE_JSON = "json";
private String topicName;
+ private String messageType;
+ private Character delimiter;
+ private Character escapeChar;
+ private Character entrySplitter;
+ private Character kvSplitter;
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/DefaultEvent2KafkaRecordHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/DefaultEvent2KafkaRecordHandler.java
index 690239778a..baccb4af48 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/DefaultEvent2KafkaRecordHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/DefaultEvent2KafkaRecordHandler.java
@@ -18,16 +18,23 @@
package org.apache.inlong.sort.standalone.sink.kafka;
import org.apache.inlong.sdk.commons.protocol.EventConstants;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import com.google.gson.Gson;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
*
@@ -41,6 +48,7 @@ public class DefaultEvent2KafkaRecordHandler implements
IEvent2KafkaRecordHandle
protected final ByteArrayOutputStream outMsg = new ByteArrayOutputStream();
protected final SimpleDateFormat dateFormat = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
protected final Date currentDate = new Date();
+ protected final Gson gson = new Gson();
/**
* parse
@@ -51,15 +59,51 @@ public class DefaultEvent2KafkaRecordHandler implements
IEvent2KafkaRecordHandle
* @throws IOException
*/
@Override
- public ProducerRecord<String, byte[]> parse(KafkaFederationSinkContext
context, ProfileEvent event)
+ public List<ProducerRecord<String, byte[]>>
parse(KafkaFederationSinkContext context, ProfileEvent event,
+ KafkaIdConfig idConfig)
throws IOException {
- String uid = event.getUid();
- KafkaIdConfig idConfig = context.getIdConfig(uid);
- if (idConfig == null) {
- context.addSendResultMetric(event, context.getTaskName(), false,
System.currentTimeMillis());
- LOG.error("Can not find the id config:{}", uid);
- return null;
+ TransformProcessor<String, ?> processor =
context.getTransformProcessor(idConfig.getDataFlowId());
+ if (processor != null) {
+ return this.parseByTransform(context, event, idConfig, processor);
+ } else {
+ ProducerRecord<String, byte[]> record = this.parseByBytes(context,
event, idConfig);
+ return Arrays.asList(record);
}
+ }
+
+ public List<ProducerRecord<String, byte[]>>
parseByTransform(KafkaFederationSinkContext context, ProfileEvent event,
+ KafkaIdConfig idConfig, TransformProcessor<String, ?> processor)
+ throws IOException {
+ // extParams
+ Map<String, Object> extParams = new ConcurrentHashMap<>();
+ extParams.putAll(context.getSinkContext().getParameters());
+ event.getHeaders().forEach((k, v) -> extParams.put(k, v));
+ // transform
+ List<?> results = processor.transformForBytes(event.getBody(),
extParams);
+ if (results == null) {
+ return new ArrayList<>();
+ }
+ // build
+ String topic = idConfig.getTopic();
+ List<ProducerRecord<String, byte[]>> records = new
ArrayList<>(results.size());
+ for (Object result : results) {
+ byte[] msgContent = null;
+ if (result instanceof String) {
+ msgContent = result.toString().getBytes();
+ } else if (result instanceof byte[]) {
+ msgContent = (byte[]) result;
+ } else {
+ msgContent = gson.toJson(result).getBytes();
+ }
+ ProducerRecord<String, byte[]> record = new
ProducerRecord<>(topic, msgContent);
+ records.add(record);
+ }
+ return records;
+ }
+
+ public ProducerRecord<String, byte[]>
parseByBytes(KafkaFederationSinkContext context, ProfileEvent event,
+ KafkaIdConfig idConfig)
+ throws IOException {
String delimiter = idConfig.getSeparator();
byte separator = (byte) delimiter.charAt(0);
outMsg.reset();
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/IEvent2KafkaRecordHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/IEvent2KafkaRecordHandler.java
index ecde8380d9..1ad51c0470 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/IEvent2KafkaRecordHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/IEvent2KafkaRecordHandler.java
@@ -22,6 +22,7 @@ import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
+import java.util.List;
/**
*
@@ -37,5 +38,6 @@ public interface IEvent2KafkaRecordHandler {
* @return ProducerRecord
* @throws IOException
*/
- ProducerRecord<String, byte[]> parse(KafkaFederationSinkContext context,
ProfileEvent event) throws IOException;
+ List<ProducerRecord<String, byte[]>> parse(KafkaFederationSinkContext
context, ProfileEvent event,
+ KafkaIdConfig idConfig) throws IOException;
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index 1405086f50..439cc8853e 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -19,34 +19,46 @@ package org.apache.inlong.sort.standalone.sink.kafka;
import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
import org.apache.inlong.common.pojo.sort.TaskConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.KafkaSinkConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.sdk.transform.encode.SinkEncoder;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.slf4j.Logger;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/** Context of kafka sink. */
+@SuppressWarnings("deprecation")
public class KafkaFederationSinkContext extends SinkContext {
public static final Logger LOG =
InlongLoggerFactory.getLogger(KafkaFederationSinkContext.class);
@@ -54,7 +66,10 @@ public class KafkaFederationSinkContext extends SinkContext {
private KafkaNodeConfig kafkaNodeConfig;
private CacheClusterConfig cacheClusterConfig;
- private Map<String, KafkaIdConfig> idConfigMap = new ConcurrentHashMap<>();
+ private Map<String, List<KafkaIdConfig>> idConfigMap = new
ConcurrentHashMap<>();
+
+ // Map<threadId, Map<dataFlowId,TransformProcess>>
+ protected Map<Long, Map<String, TransformProcessor<String, ?>>>
transformMap = new ConcurrentHashMap<>();
public KafkaFederationSinkContext(String sinkName, Context context,
Channel channel) {
super(sinkName, context, channel);
@@ -67,25 +82,20 @@ public class KafkaFederationSinkContext extends SinkContext
{
try {
TaskConfig newTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
- if (newTaskConfig == null && newSortTaskConfig == null) {
- LOG.error("newSortTaskConfig is null.");
- return;
- }
- if ((this.taskConfig != null &&
this.taskConfig.equals(newTaskConfig))
- && (this.sortTaskConfig != null &&
this.sortTaskConfig.equals(newSortTaskConfig))) {
+ if ((newTaskConfig == null ||
StringUtils.equals(this.taskConfigJson, gson.toJson(newTaskConfig)))
+ && (newSortTaskConfig == null
+ || StringUtils.equals(this.sortTaskConfigJson,
gson.toJson(newSortTaskConfig)))) {
LOG.info("Same sortTaskConfig, do nothing.");
return;
}
-
+ LOG.info("get new SortTaskConfig:taskName:{}", taskName);
if (newTaskConfig != null) {
KafkaNodeConfig requestNodeConfig = (KafkaNodeConfig)
newTaskConfig.getNodeConfig();
if (kafkaNodeConfig == null || requestNodeConfig.getVersion()
> kafkaNodeConfig.getVersion()) {
this.kafkaNodeConfig = requestNodeConfig;
}
}
-
- this.taskConfig = newTaskConfig;
- this.sortTaskConfig = newSortTaskConfig;
+ this.replaceConfig(newTaskConfig, newSortTaskConfig);
CacheClusterConfig clusterConfig = new CacheClusterConfig();
clusterConfig.setClusterName(this.taskName);
@@ -94,16 +104,21 @@ public class KafkaFederationSinkContext extends
SinkContext {
}
this.cacheClusterConfig = clusterConfig;
- Map<String, KafkaIdConfig> fromTaskConfig =
fromTaskConfig(taskConfig);
- Map<String, KafkaIdConfig> fromSortTaskConfig =
fromSortTaskConfig(sortTaskConfig);
- SortConfigMetricReporter.reportClusterDiff(clusterId, taskName,
fromTaskConfig, fromSortTaskConfig);
+ Map<String, List<KafkaIdConfig>> fromTaskConfig =
fromTaskConfig(taskConfig);
+ Map<String, List<KafkaIdConfig>> fromSortTaskConfig =
fromSortTaskConfig(sortTaskConfig);
this.idConfigMap = unifiedConfiguration ? fromTaskConfig :
fromSortTaskConfig;
+ if (unifiedConfiguration) {
+ this.idConfigMap = fromTaskConfig;
+ this.transformMap.clear();
+ } else {
+ this.idConfigMap = fromSortTaskConfig;
+ }
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
- public Map<String, KafkaIdConfig> fromTaskConfig(TaskConfig taskConfig) {
+ public Map<String, List<KafkaIdConfig>> fromTaskConfig(TaskConfig
taskConfig) {
if (taskConfig == null) {
return new HashMap<>();
}
@@ -114,21 +129,24 @@ public class KafkaFederationSinkContext extends
SinkContext {
.map(KafkaIdConfig::create)
.collect(Collectors.toMap(
config ->
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
- v -> v,
- (flow1, flow2) -> flow1));
+ v -> Arrays.asList(v),
+ (v1, v2) -> {
+ v1.addAll(v2);
+ return v1;
+ }));
}
- public Map<String, KafkaIdConfig> fromSortTaskConfig(SortTaskConfig
sortTaskConfig) {
+ public Map<String, List<KafkaIdConfig>> fromSortTaskConfig(SortTaskConfig
sortTaskConfig) {
if (sortTaskConfig == null) {
return new HashMap<>();
}
List<Map<String, String>> idList = sortTaskConfig.getIdParams();
- Map<String, KafkaIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
+ Map<String, List<KafkaIdConfig>> newIdConfigMap = new
ConcurrentHashMap<>();
for (Map<String, String> idParam : idList) {
try {
KafkaIdConfig idConfig = new KafkaIdConfig(idParam);
- newIdConfigMap.put(idConfig.getUid(), idConfig);
+ newIdConfigMap.put(idConfig.getUid(), Arrays.asList(idConfig));
} catch (Exception e) {
LOG.error("fail to parse kafka id config", e);
}
@@ -144,25 +162,14 @@ public class KafkaFederationSinkContext extends
SinkContext {
return cacheClusterConfig;
}
- /**
- * get Topic by uid
- *
- * @param uid uid
- * @return topic
- */
- public String getTopic(String uid) {
- KafkaIdConfig idConfig = this.idConfigMap.get(uid);
- return Objects.isNull(idConfig) ? null : idConfig.getTopic();
- }
-
/**
* get KafkaIdConfig by uid
*
* @param uid uid
* @return KafkaIdConfig
*/
- public KafkaIdConfig getIdConfig(String uid) {
- KafkaIdConfig idConfig = this.idConfigMap.get(uid);
+ public List<KafkaIdConfig> getIdConfig(String uid) {
+ List<KafkaIdConfig> idConfig = this.idConfigMap.get(uid);
if (idConfig == null) {
throw new NullPointerException("uid " + uid + "got null
KafkaIdConfig");
}
@@ -176,16 +183,7 @@ public class KafkaFederationSinkContext extends
SinkContext {
* @param topic
*/
public void addSendMetric(ProfileEvent currentRecord, String topic) {
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
- dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
- // metric
- fillInlongId(currentRecord, dimensions);
- dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
- dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, topic);
- long msgTime = currentRecord.getRawLogTime();
- long auditFormatTime = msgTime - msgTime %
CommonPropertiesHolder.getAuditFormatInterval();
- dimensions.put(SortMetricItem.KEY_MESSAGE_TIME,
String.valueOf(auditFormatTime));
+ Map<String, String> dimensions = this.getDimensions(currentRecord,
topic);
SortMetricItem metricItem =
this.getMetricItemSet().findMetricItem(dimensions);
long count = 1;
long size = currentRecord.getBody().length;
@@ -216,16 +214,7 @@ public class KafkaFederationSinkContext extends
SinkContext {
* @param sendTime
*/
public void addSendResultMetric(ProfileEvent currentRecord, String topic,
boolean result, long sendTime) {
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
- dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
- // metric
- fillInlongId(currentRecord, dimensions);
- dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
- dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, topic);
- long msgTime = currentRecord.getRawLogTime();
- long auditFormatTime = msgTime - msgTime %
CommonPropertiesHolder.getAuditFormatInterval();
- dimensions.put(SortMetricItem.KEY_MESSAGE_TIME,
String.valueOf(auditFormatTime));
+ Map<String, String> dimensions = this.getDimensions(currentRecord,
topic);
SortMetricItem metricItem =
this.getMetricItemSet().findMetricItem(dimensions);
long count = 1;
long size = currentRecord.getBody().length;
@@ -248,6 +237,27 @@ public class KafkaFederationSinkContext extends
SinkContext {
}
}
+ public void addSendFilterMetric(ProfileEvent currentRecord, String bid) {
+ Map<String, String> dimensions = this.getDimensions(currentRecord,
bid);
+ SortMetricItem metricItem =
this.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.sendFilterCount.incrementAndGet();
+ metricItem.sendFilterSize.addAndGet(currentRecord.getBody().length);
+ }
+
+ private Map<String, String> getDimensions(ProfileEvent currentRecord,
String bid) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+ // metric
+ fillInlongId(currentRecord, dimensions);
+ dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+ dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, bid);
+ long msgTime = currentRecord.getRawLogTime();
+ long auditFormatTime = msgTime - msgTime %
CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(SortMetricItem.KEY_MESSAGE_TIME,
String.valueOf(auditFormatTime));
+ return dimensions;
+ }
+
/**
* create IEvent2ProducerRecordHandler
*
@@ -270,4 +280,91 @@ public class KafkaFederationSinkContext extends
SinkContext {
}
return null;
}
+
+ public TransformProcessor<String, ?> getTransformProcessor(String
dataFlowId) {
+ Long threadId = Thread.currentThread().getId();
+ Map<String, TransformProcessor<String, ?>> transformProcessors =
this.transformMap.get(threadId);
+ if (transformProcessors == null) {
+ transformProcessors = reloadTransform(taskConfig);
+ this.transformMap.put(threadId, transformProcessors);
+ }
+ return transformProcessors.get(dataFlowId);
+ }
+
+ private Map<String, TransformProcessor<String, ?>>
reloadTransform(TaskConfig taskConfig) {
+ ImmutableMap.Builder<String, TransformProcessor<String, ?>> builder =
new ImmutableMap.Builder<>();
+
+ taskConfig.getClusterTagConfigs()
+ .stream()
+ .map(ClusterTagConfig::getDataFlowConfigs)
+ .flatMap(Collection::stream)
+ .forEach(flow -> {
+ if (StringUtils.isEmpty(flow.getTransformSql())) {
+ return;
+ }
+ TransformProcessor<String, ?> transformProcessor =
createTransform(flow);
+ if (transformProcessor == null) {
+ return;
+ }
+ builder.put(flow.getDataflowId(),
+ transformProcessor);
+ });
+
+ return builder.build();
+ }
+
+ private TransformProcessor<String, ?> createTransform(DataFlowConfig
dataFlowConfig) {
+ try {
+ LOG.info("try to create transform:{}", dataFlowConfig.toString());
+ return TransformProcessor.create(
+ createTransformConfig(dataFlowConfig),
+ createSourceDecoder(dataFlowConfig.getSourceConfig()),
+ createSinkEncoder(dataFlowConfig.getSinkConfig()));
+ } catch (Exception e) {
+ LOG.error("failed to reload transform of dataflow={}, ex={}",
dataFlowConfig.getDataflowId(),
+ e.getMessage(), e);
+ return null;
+ }
+ }
+
+ private SinkEncoder<?> createSinkEncoder(SinkConfig sinkConfig) {
+ if (!(sinkConfig instanceof KafkaSinkConfig)) {
+ throw new IllegalArgumentException("sinkInfo must be an instance
of KafkaSinkConfig");
+ }
+ KafkaSinkConfig sSinkConfig = (KafkaSinkConfig) sinkConfig;
+ List<FieldInfo> fieldInfos = sSinkConfig.getFieldConfigs()
+ .stream()
+ .map(config -> new FieldInfo(config.getName(),
deriveTypeConverter(config.getFormatInfo())))
+ .collect(Collectors.toList());
+
+ if (StringUtils.equalsIgnoreCase(KafkaSinkConfig.MESSAGE_TYPE_CSV,
sSinkConfig.getMessageType())) {
+ Character delimiter = sSinkConfig.getDelimiter();
+ if (delimiter == null) {
+ delimiter = KafkaSinkConfig.CSV_DEFAULT_DELIMITER;
+ }
+ CsvSinkInfo sinkInfo = new
CsvSinkInfo(sinkConfig.getEncodingType(), delimiter,
sSinkConfig.getEscapeChar(),
+ fieldInfos);
+ return SinkEncoderFactory.createCsvEncoder(sinkInfo);
+ } else if
(StringUtils.equalsIgnoreCase(KafkaSinkConfig.MESSAGE_TYPE_KV,
sSinkConfig.getMessageType())) {
+ Character entrySplitter = sSinkConfig.getEntrySplitter();
+ if (entrySplitter == null) {
+ entrySplitter = KafkaSinkConfig.KV_DEFAULT_ENTRYSPLITTER;
+ }
+ Character kvSplitter = sSinkConfig.getKvSplitter();
+ if (kvSplitter == null) {
+ kvSplitter = KafkaSinkConfig.KV_DEFAULT_KVSPLITTER;
+ }
+ KvSinkInfo sinkInfo = new KvSinkInfo(sinkConfig.getEncodingType(),
fieldInfos);
+ sinkInfo.setEntryDelimiter(entrySplitter);
+ sinkInfo.setKvDelimiter(kvSplitter);
+ return SinkEncoderFactory.createKvEncoder(sinkInfo);
+ } else if
(StringUtils.equalsIgnoreCase(KafkaSinkConfig.MESSAGE_TYPE_JSON,
sSinkConfig.getMessageType())) {
+ MapSinkInfo sinkInfo = new
MapSinkInfo(sinkConfig.getEncodingType(), fieldInfos);
+ return SinkEncoderFactory.createMapEncoder(sinkInfo);
+ } else {
+ CsvSinkInfo sinkInfo = new
CsvSinkInfo(sinkConfig.getEncodingType(), KafkaSinkConfig.CSV_DEFAULT_DELIMITER,
+ null, fieldInfos);
+ return SinkEncoderFactory.createCsvEncoder(sinkInfo);
+ }
+ }
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
index a481743c66..e502afd42b 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
@@ -30,6 +30,9 @@ import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleState;
import org.slf4j.Logger;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -57,8 +60,7 @@ public class KafkaFederationWorker extends Thread {
super();
this.workerName = sinkName + "-" + workerIndex;
this.context = Preconditions.checkNotNull(context);
- this.producerFederation =
- new KafkaProducerFederation(String.valueOf(workerIndex),
this.context);
+ this.producerFederation = new
KafkaProducerFederation(String.valueOf(workerIndex), this.context);
this.status = LifecycleState.IDLE;
this.dimensions.put(SortMetricItem.KEY_CLUSTER_ID,
this.context.getClusterId());
this.dimensions.put(SortMetricItem.KEY_TASK_NAME,
this.context.getTaskName());
@@ -86,52 +88,73 @@ public class KafkaFederationWorker extends Thread {
public void run() {
LOG.info("worker {} start to run, the state is {}", this.workerName,
status.name());
while (status != LifecycleState.STOP) {
- Transaction tx = null;
+ KafkaTransaction ktx = KafkaTransaction.builder().dataFlowIds(new
HashSet<>()).build();
try {
Channel channel = context.getChannel();
- tx = channel.getTransaction();
+ Transaction tx = channel.getTransaction();
tx.begin();
+ ktx.setTx(tx);
Event rowEvent = channel.take();
// if event is null, close tx and sleep for a while.
if (rowEvent == null) {
- tx.commit();
- tx.close();
+ ktx.negativeAck();
sleepOneInterval();
continue;
}
+ // if event is not ProfileEvent
if (!(rowEvent instanceof ProfileEvent)) {
- tx.commit();
- tx.close();
+ ktx.negativeAck();
LOG.error("The type of row event is not compatible with
ProfileEvent");
continue;
}
ProfileEvent profileEvent = (ProfileEvent) rowEvent;
- String topic = this.context.getTopic(profileEvent.getUid());
- if (StringUtils.isBlank(topic)) {
+ ktx.setProfileEvent(profileEvent);
+ // if there is not config
+ List<KafkaIdConfig> idConfigs =
this.context.getIdConfig(profileEvent.getUid());
+ if (idConfigs == null) {
this.context.addSendResultMetric(profileEvent,
profileEvent.getUid(),
false, System.currentTimeMillis());
- profileEvent.ack();
- tx.commit();
- tx.close();
+ ktx.negativeAck();
+ continue;
}
- profileEvent.getHeaders().put(Constants.TOPIC, topic);
- this.context.addSendMetric(profileEvent, topic);
- this.producerFederation.send(profileEvent, tx);
+ // if there is multi-output
+ idConfigs.forEach(v ->
ktx.getDataFlowIds().add(v.getDataFlowId()));
+ this.processEvent(ktx, idConfigs, profileEvent);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
- if (tx != null) {
- tx.rollback();
- tx.close();
+ if (ktx.getProfileEvent() != null) {
+ this.context.addSendResultMetric(ktx.getProfileEvent(),
+ ktx.getProfileEvent().getUid(),
+ false, System.currentTimeMillis());
+ } else {
+ SortMetricItem metricItem =
this.context.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.sendFailCount.incrementAndGet();
}
- // metric
- SortMetricItem metricItem =
-
this.context.getMetricItemSet().findMetricItem(dimensions);
- metricItem.sendFailCount.incrementAndGet();
- sleepOneInterval();
+ ktx.negativeAck();
+ }
+ }
+ }
+
+ private boolean processEvent(KafkaTransaction ktx, List<KafkaIdConfig>
idConfigs, ProfileEvent profileEvent)
+ throws IOException {
+ for (KafkaIdConfig idConfig : idConfigs) {
+ String topic = idConfig.getTopic();
+ if (StringUtils.isBlank(topic)) {
+ LOG.error("can not find the topic,dataFlowId:{},uid:{}",
idConfig.getDataFlowId(), idConfig.getUid());
+ this.context.addSendResultMetric(profileEvent,
idConfig.getDataFlowId(),
+ false, System.currentTimeMillis());
+ // ktx.negativeAck();
+ // return false;
+ ktx.ack(idConfig.getDataFlowId());
+ } else {
+ profileEvent.getHeaders().put(Constants.TOPIC, topic);
+ this.context.addSendMetric(profileEvent,
idConfig.getDataFlowId());
+ this.producerFederation.send(ktx, profileEvent, idConfig);
}
}
+ return true;
}
/** sleepOneInterval */
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
index 7ded1917f9..efe2018d21 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
@@ -52,6 +52,7 @@ public class KafkaIdConfig extends IdConfig {
private String topic;
private DataTypeEnum dataType;
private DataFlowConfig dataFlowConfig;
+ private String dataFlowId;
public KafkaIdConfig(Map<String, String> idParam) {
this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
@@ -61,6 +62,7 @@ public class KafkaIdConfig extends IdConfig {
this.topic = idParam.getOrDefault(Constants.TOPIC, uid);
this.dataType = DataTypeEnum
.convert(idParam.getOrDefault(KafkaIdConfig.KEY_DATA_TYPE,
DataTypeEnum.TEXT.getType()));
+ this.dataFlowId = this.uid;
}
public static KafkaIdConfig create(DataFlowConfig dataFlowConfig) {
@@ -86,6 +88,7 @@ public class KafkaIdConfig extends IdConfig {
.dataType(dataType)
.separator(separator)
.dataFlowConfig(dataFlowConfig)
+ .dataFlowId(dataFlowConfig.getDataflowId())
.build();
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index d10d32d02e..b8a223d0a8 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -24,7 +24,6 @@ import
org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import com.google.common.base.Preconditions;
-import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -35,12 +34,12 @@ import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.datanucleus.util.StringUtils;
import org.slf4j.Logger;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
+import java.util.List;
import java.util.Properties;
/**
@@ -202,66 +201,60 @@ public class KafkaProducerCluster implements
LifecycleAware {
* @return boolean
* @throws IOException
*/
- public boolean send(ProfileEvent profileEvent, Transaction tx) throws
IOException {
- String topic = sinkContext.getTopic(profileEvent.getUid());
- ProducerRecord<String, byte[]> record = handler.parse(sinkContext,
profileEvent);
+ public boolean send(KafkaTransaction ktx, ProfileEvent profileEvent,
KafkaIdConfig idConfig) throws IOException {
+ List<ProducerRecord<String, byte[]>> records =
handler.parse(sinkContext, profileEvent, idConfig);
long sendTime = System.currentTimeMillis();
// check
- if (record == null || StringUtils.isEmpty(topic)) {
- tx.commit();
- profileEvent.ack();
- tx.close();
- sinkContext.addSendResultMetric(profileEvent, topic, false,
sendTime);
+ if (records == null || records.size() == 0) {
+ sinkContext.addSendFilterMetric(profileEvent,
idConfig.getDataFlowId());
+ ktx.ack(idConfig.getDataFlowId());
return true;
}
try {
- producer.send(record,
- (metadata, ex) -> {
- if (ex == null) {
- tx.commit();
- sinkContext.addSendResultMetric(profileEvent,
topic, true, sendTime);
- profileEvent.ack();
- } else {
-
- if (ex instanceof RecordTooLargeException) {
- // for the message bigger than
configuredMaxPayloadSize, just discard it;
- // otherwise, retry and wait for the server
side changes the limitation
- if (record.value().length >
configuredMaxPayloadSize) {
- tx.commit();
- profileEvent.ack();
+ for (ProducerRecord<String, byte[]> record : records) {
+ producer.send(record,
+ (metadata, ex) -> {
+ if (ex == null) {
+ sinkContext.addSendResultMetric(profileEvent,
idConfig.getDataFlowId(), true, sendTime);
+ ktx.ack(idConfig.getDataFlowId());
+ } else {
+ LOG.error("send failed,
uid:{},dataFlowId:{},topic:{},error:{}",
+ idConfig.getUid(),
+ idConfig.getDataFlowId(),
+ idConfig.getTopic(),
+ ex.getMessage(),
+ ex);
+ sinkContext.addSendResultMetric(profileEvent,
idConfig.getDataFlowId(), false,
+ sendTime);
+ if (ex instanceof RecordTooLargeException) {
+ // for the message bigger than
configuredMaxPayloadSize, just discard it;
+ // otherwise, retry and wait for the
server side changes the limitation
+ if (record.value().length >
configuredMaxPayloadSize) {
+ ktx.ack(idConfig.getDataFlowId());
+ } else {
+ ktx.negativeAck();
+ }
+ } else if (ex instanceof
UnknownTopicOrPartitionException
+ || !(ex instanceof
RetriableException)) {
+ // for non-retriable exception, just
discard it
+ ktx.ack(idConfig.getDataFlowId());
} else {
- this.exceptionProcess(profileEvent, tx);
+ ktx.negativeAck();
}
- } else if (ex instanceof
UnknownTopicOrPartitionException
- || !(ex instanceof RetriableException)) {
- // for non-retriable exception, just discard it
- tx.commit();
- profileEvent.ack();
- } else {
- this.exceptionProcess(profileEvent, tx);
}
- LOG.error(String.format("send failed, topic is
%s", topic), ex);
- sinkContext.addSendResultMetric(profileEvent,
topic, false, sendTime);
- }
- tx.close();
- });
+ });
+ }
return true;
} catch (Exception e) {
- this.exceptionProcess(profileEvent, tx);
- tx.close();
- LOG.error(e.getMessage(), e);
- sinkContext.addSendResultMetric(profileEvent, topic, false,
sendTime);
+ sinkContext.addSendResultMetric(profileEvent,
idConfig.getDataFlowId(), false, sendTime);
+ LOG.error("send failed, uid:{},dataFlowId:{},topic:{},error:{}",
+ idConfig.getUid(),
+ idConfig.getDataFlowId(),
+ idConfig.getTopic(),
+ e.getMessage(),
+ e);
+ ktx.negativeAck();
return false;
}
}
-
- private void exceptionProcess(ProfileEvent profileEvent, Transaction tx) {
- profileEvent.incrementSendedTime();
- if (profileEvent.getSendedTime() <=
CommonPropertiesHolder.getMaxSendFailTimes()) {
- tx.rollback();
- } else {
- tx.commit();
- profileEvent.negativeAck();
- }
- }
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
index 219519b907..eec50c9eab 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
@@ -24,7 +24,6 @@ import
org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import com.google.common.base.Preconditions;
-import org.apache.flume.Transaction;
import org.slf4j.Logger;
import java.io.IOException;
@@ -103,8 +102,8 @@ public class KafkaProducerFederation implements Runnable {
return;
}
this.cacheClusterConfig = context.getCacheClusterConfig();
- KafkaProducerCluster updateCluster =
- new KafkaProducerCluster(workerName, cacheClusterConfig,
nodeConfig, context);
+ KafkaProducerCluster updateCluster = new
KafkaProducerCluster(workerName, cacheClusterConfig, nodeConfig,
+ context);
updateCluster.start();
this.deleteCluster = cluster;
this.cluster = updateCluster;
@@ -120,8 +119,8 @@ public class KafkaProducerFederation implements Runnable {
return;
}
this.nodeConfig = context.getNodeConfig();
- KafkaProducerCluster updateCluster =
- new KafkaProducerCluster(workerName, cacheClusterConfig,
nodeConfig, context);
+ KafkaProducerCluster updateCluster = new
KafkaProducerCluster(workerName, cacheClusterConfig, nodeConfig,
+ context);
updateCluster.start();
this.deleteCluster = cluster;
this.cluster = updateCluster;
@@ -130,8 +129,8 @@ public class KafkaProducerFederation implements Runnable {
}
}
- public boolean send(ProfileEvent profileEvent, Transaction tx) throws
IOException {
- return cluster.send(profileEvent, tx);
+ public boolean send(KafkaTransaction ktx, ProfileEvent profileEvent,
KafkaIdConfig idConfig) throws IOException {
+ return cluster.send(ktx, profileEvent, idConfig);
}
/** Init ScheduledExecutorService with fix reload rate {@link
#reloadInterval}. */
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaTransaction.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaTransaction.java
new file mode 100644
index 0000000000..9b4060375e
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaTransaction.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.kafka;
+
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+
+import lombok.Builder;
+import lombok.Data;
+import org.apache.flume.Transaction;
+
+import java.util.Set;
+
+/**
+ * KafkaTransaction
+ *
+ */
+@Data
+@Builder
+public class KafkaTransaction {
+
+ private Transaction tx;
+ private ProfileEvent profileEvent;
+ private Set<String> dataFlowIds;
+
+ /**
+ * ack
+ */
+ public synchronized void ack(String dataFlowId) {
+ if (dataFlowIds != null) {
+ dataFlowIds.remove(dataFlowId);
+ if (dataFlowIds.size() > 0) {
+ return;
+ }
+ if (profileEvent != null) {
+ profileEvent.ack();
+ }
+ if (tx != null) {
+ tx.commit();
+ tx.close();
+ }
+ }
+ }
+
+ /**
+ * negativeAck
+ */
+ public synchronized void negativeAck() {
+ if (profileEvent != null) {
+ profileEvent.negativeAck();
+ }
+ if (tx != null) {
+ tx.commit();
+ tx.close();
+ }
+ dataFlowIds = null;
+ profileEvent = null;
+ tx = null;
+ }
+}