This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 28759a9816 [INLONG-11958][Sort] Allow SortCkafka to filter out data in 
TransformFunction (#11959)
28759a9816 is described below

commit 28759a9816e03a97bce8981ea608e040f6f7943f
Author: ChunLiang Lu <[email protected]>
AuthorDate: Fri Aug 8 16:41:28 2025 +0800

    [INLONG-11958][Sort] Allow SortCkafka to filter out data in 
TransformFunction (#11959)
    
    * [INLONG-11958][Sort] Allow SortCkafka to filter out data in 
TransformFunction
    
    * [INLONG-11958][Sort] Allow SortCkafka to filter out data in 
TransformFunction
---
 .../pojo/sort/dataflow/sink/KafkaSinkConfig.java   |  11 ++
 .../kafka/DefaultEvent2KafkaRecordHandler.java     |  58 +++++-
 .../sink/kafka/IEvent2KafkaRecordHandler.java      |   4 +-
 .../sink/kafka/KafkaFederationSinkContext.java     | 207 +++++++++++++++------
 .../sink/kafka/KafkaFederationWorker.java          |  71 ++++---
 .../sort/standalone/sink/kafka/KafkaIdConfig.java  |   3 +
 .../sink/kafka/KafkaProducerCluster.java           |  95 +++++-----
 .../sink/kafka/KafkaProducerFederation.java        |  13 +-
 .../standalone/sink/kafka/KafkaTransaction.java    |  74 ++++++++
 9 files changed, 391 insertions(+), 145 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
index f0ec2bd6b1..78abaea839 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
@@ -24,5 +24,16 @@ import lombok.EqualsAndHashCode;
 @Data
 public class KafkaSinkConfig extends SinkConfig {
 
+    public static final String MESSAGE_TYPE_CSV = "csv";
+    public static final Character CSV_DEFAULT_DELIMITER = '|';
+    public static final String MESSAGE_TYPE_KV = "kv";
+    public static final Character KV_DEFAULT_ENTRYSPLITTER = '&';
+    public static final Character KV_DEFAULT_KVSPLITTER = '=';
+    public static final String MESSAGE_TYPE_JSON = "json";
     private String topicName;
+    private String messageType;
+    private Character delimiter;
+    private Character escapeChar;
+    private Character entrySplitter;
+    private Character kvSplitter;
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/DefaultEvent2KafkaRecordHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/DefaultEvent2KafkaRecordHandler.java
index 690239778a..baccb4af48 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/DefaultEvent2KafkaRecordHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/DefaultEvent2KafkaRecordHandler.java
@@ -18,16 +18,23 @@
 package org.apache.inlong.sort.standalone.sink.kafka;
 
 import org.apache.inlong.sdk.commons.protocol.EventConstants;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
+import com.google.gson.Gson;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * 
@@ -41,6 +48,7 @@ public class DefaultEvent2KafkaRecordHandler implements 
IEvent2KafkaRecordHandle
     protected final ByteArrayOutputStream outMsg = new ByteArrayOutputStream();
     protected final SimpleDateFormat dateFormat = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
     protected final Date currentDate = new Date();
+    protected final Gson gson = new Gson();
 
     /**
      * parse
@@ -51,15 +59,51 @@ public class DefaultEvent2KafkaRecordHandler implements 
IEvent2KafkaRecordHandle
      * @throws IOException
      */
     @Override
-    public ProducerRecord<String, byte[]> parse(KafkaFederationSinkContext 
context, ProfileEvent event)
+    public List<ProducerRecord<String, byte[]>> 
parse(KafkaFederationSinkContext context, ProfileEvent event,
+            KafkaIdConfig idConfig)
             throws IOException {
-        String uid = event.getUid();
-        KafkaIdConfig idConfig = context.getIdConfig(uid);
-        if (idConfig == null) {
-            context.addSendResultMetric(event, context.getTaskName(), false, 
System.currentTimeMillis());
-            LOG.error("Can not find the id config:{}", uid);
-            return null;
+        TransformProcessor<String, ?> processor = 
context.getTransformProcessor(idConfig.getDataFlowId());
+        if (processor != null) {
+            return this.parseByTransform(context, event, idConfig, processor);
+        } else {
+            ProducerRecord<String, byte[]> record = this.parseByBytes(context, 
event, idConfig);
+            return Arrays.asList(record);
         }
+    }
+
+    public List<ProducerRecord<String, byte[]>> 
parseByTransform(KafkaFederationSinkContext context, ProfileEvent event,
+            KafkaIdConfig idConfig, TransformProcessor<String, ?> processor)
+            throws IOException {
+        // extParams
+        Map<String, Object> extParams = new ConcurrentHashMap<>();
+        extParams.putAll(context.getSinkContext().getParameters());
+        event.getHeaders().forEach((k, v) -> extParams.put(k, v));
+        // transform
+        List<?> results = processor.transformForBytes(event.getBody(), 
extParams);
+        if (results == null) {
+            return new ArrayList<>();
+        }
+        // build
+        String topic = idConfig.getTopic();
+        List<ProducerRecord<String, byte[]>> records = new 
ArrayList<>(results.size());
+        for (Object result : results) {
+            byte[] msgContent = null;
+            if (result instanceof String) {
+                msgContent = result.toString().getBytes();
+            } else if (result instanceof byte[]) {
+                msgContent = (byte[]) result;
+            } else {
+                msgContent = gson.toJson(result).getBytes();
+            }
+            ProducerRecord<String, byte[]> record = new 
ProducerRecord<>(topic, msgContent);
+            records.add(record);
+        }
+        return records;
+    }
+
+    public ProducerRecord<String, byte[]> 
parseByBytes(KafkaFederationSinkContext context, ProfileEvent event,
+            KafkaIdConfig idConfig)
+            throws IOException {
         String delimiter = idConfig.getSeparator();
         byte separator = (byte) delimiter.charAt(0);
         outMsg.reset();
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/IEvent2KafkaRecordHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/IEvent2KafkaRecordHandler.java
index ecde8380d9..1ad51c0470 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/IEvent2KafkaRecordHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/IEvent2KafkaRecordHandler.java
@@ -22,6 +22,7 @@ import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * 
@@ -37,5 +38,6 @@ public interface IEvent2KafkaRecordHandler {
      * @return             ProducerRecord
      * @throws IOException
      */
-    ProducerRecord<String, byte[]> parse(KafkaFederationSinkContext context, 
ProfileEvent event) throws IOException;
+    List<ProducerRecord<String, byte[]>> parse(KafkaFederationSinkContext 
context, ProfileEvent event,
+            KafkaIdConfig idConfig) throws IOException;
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index 1405086f50..439cc8853e 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -19,34 +19,46 @@ package org.apache.inlong.sort.standalone.sink.kafka;
 
 import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
 import org.apache.inlong.common.pojo.sort.TaskConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.KafkaSinkConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
 import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
 import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.sdk.transform.encode.SinkEncoder;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.slf4j.Logger;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /** Context of kafka sink. */
+@SuppressWarnings("deprecation")
 public class KafkaFederationSinkContext extends SinkContext {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(KafkaFederationSinkContext.class);
@@ -54,7 +66,10 @@ public class KafkaFederationSinkContext extends SinkContext {
 
     private KafkaNodeConfig kafkaNodeConfig;
     private CacheClusterConfig cacheClusterConfig;
-    private Map<String, KafkaIdConfig> idConfigMap = new ConcurrentHashMap<>();
+    private Map<String, List<KafkaIdConfig>> idConfigMap = new 
ConcurrentHashMap<>();
+
+    // Map<threadId, Map<dataFlowId,TransformProcess>>
+    protected Map<Long, Map<String, TransformProcessor<String, ?>>> 
transformMap = new ConcurrentHashMap<>();
 
     public KafkaFederationSinkContext(String sinkName, Context context, 
Channel channel) {
         super(sinkName, context, channel);
@@ -67,25 +82,20 @@ public class KafkaFederationSinkContext extends SinkContext 
{
         try {
             TaskConfig newTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
             SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
-            if (newTaskConfig == null && newSortTaskConfig == null) {
-                LOG.error("newSortTaskConfig is null.");
-                return;
-            }
-            if ((this.taskConfig != null && 
this.taskConfig.equals(newTaskConfig))
-                    && (this.sortTaskConfig != null && 
this.sortTaskConfig.equals(newSortTaskConfig))) {
+            if ((newTaskConfig == null || 
StringUtils.equals(this.taskConfigJson, gson.toJson(newTaskConfig)))
+                    && (newSortTaskConfig == null
+                            || StringUtils.equals(this.sortTaskConfigJson, 
gson.toJson(newSortTaskConfig)))) {
                 LOG.info("Same sortTaskConfig, do nothing.");
                 return;
             }
-
+            LOG.info("get new SortTaskConfig:taskName:{}", taskName);
             if (newTaskConfig != null) {
                 KafkaNodeConfig requestNodeConfig = (KafkaNodeConfig) 
newTaskConfig.getNodeConfig();
                 if (kafkaNodeConfig == null || requestNodeConfig.getVersion() 
> kafkaNodeConfig.getVersion()) {
                     this.kafkaNodeConfig = requestNodeConfig;
                 }
             }
-
-            this.taskConfig = newTaskConfig;
-            this.sortTaskConfig = newSortTaskConfig;
+            this.replaceConfig(newTaskConfig, newSortTaskConfig);
 
             CacheClusterConfig clusterConfig = new CacheClusterConfig();
             clusterConfig.setClusterName(this.taskName);
@@ -94,16 +104,21 @@ public class KafkaFederationSinkContext extends 
SinkContext {
             }
             this.cacheClusterConfig = clusterConfig;
 
-            Map<String, KafkaIdConfig> fromTaskConfig = 
fromTaskConfig(taskConfig);
-            Map<String, KafkaIdConfig> fromSortTaskConfig = 
fromSortTaskConfig(sortTaskConfig);
-            SortConfigMetricReporter.reportClusterDiff(clusterId, taskName, 
fromTaskConfig, fromSortTaskConfig);
+            Map<String, List<KafkaIdConfig>> fromTaskConfig = 
fromTaskConfig(taskConfig);
+            Map<String, List<KafkaIdConfig>> fromSortTaskConfig = 
fromSortTaskConfig(sortTaskConfig);
             this.idConfigMap = unifiedConfiguration ? fromTaskConfig : 
fromSortTaskConfig;
+            if (unifiedConfiguration) {
+                this.idConfigMap = fromTaskConfig;
+                this.transformMap.clear();
+            } else {
+                this.idConfigMap = fromSortTaskConfig;
+            }
         } catch (Throwable e) {
             LOG.error(e.getMessage(), e);
         }
     }
 
-    public Map<String, KafkaIdConfig> fromTaskConfig(TaskConfig taskConfig) {
+    public Map<String, List<KafkaIdConfig>> fromTaskConfig(TaskConfig 
taskConfig) {
         if (taskConfig == null) {
             return new HashMap<>();
         }
@@ -114,21 +129,24 @@ public class KafkaFederationSinkContext extends 
SinkContext {
                 .map(KafkaIdConfig::create)
                 .collect(Collectors.toMap(
                         config -> 
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
-                        v -> v,
-                        (flow1, flow2) -> flow1));
+                        v -> Arrays.asList(v),
+                        (v1, v2) -> {
+                            v1.addAll(v2);
+                            return v1;
+                        }));
     }
 
-    public Map<String, KafkaIdConfig> fromSortTaskConfig(SortTaskConfig 
sortTaskConfig) {
+    public Map<String, List<KafkaIdConfig>> fromSortTaskConfig(SortTaskConfig 
sortTaskConfig) {
         if (sortTaskConfig == null) {
             return new HashMap<>();
         }
 
         List<Map<String, String>> idList = sortTaskConfig.getIdParams();
-        Map<String, KafkaIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
+        Map<String, List<KafkaIdConfig>> newIdConfigMap = new 
ConcurrentHashMap<>();
         for (Map<String, String> idParam : idList) {
             try {
                 KafkaIdConfig idConfig = new KafkaIdConfig(idParam);
-                newIdConfigMap.put(idConfig.getUid(), idConfig);
+                newIdConfigMap.put(idConfig.getUid(), Arrays.asList(idConfig));
             } catch (Exception e) {
                 LOG.error("fail to parse kafka id config", e);
             }
@@ -144,25 +162,14 @@ public class KafkaFederationSinkContext extends 
SinkContext {
         return cacheClusterConfig;
     }
 
-    /**
-     * get Topic by uid
-     *
-     * @param  uid uid
-     * @return     topic
-     */
-    public String getTopic(String uid) {
-        KafkaIdConfig idConfig = this.idConfigMap.get(uid);
-        return Objects.isNull(idConfig) ? null : idConfig.getTopic();
-    }
-
     /**
      * get KafkaIdConfig by uid
      *
      * @param  uid uid
      * @return     KafkaIdConfig
      */
-    public KafkaIdConfig getIdConfig(String uid) {
-        KafkaIdConfig idConfig = this.idConfigMap.get(uid);
+    public List<KafkaIdConfig> getIdConfig(String uid) {
+        List<KafkaIdConfig> idConfig = this.idConfigMap.get(uid);
         if (idConfig == null) {
             throw new NullPointerException("uid " + uid + "got null 
KafkaIdConfig");
         }
@@ -176,16 +183,7 @@ public class KafkaFederationSinkContext extends 
SinkContext {
      * @param topic
      */
     public void addSendMetric(ProfileEvent currentRecord, String topic) {
-        Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
-        dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
-        // metric
-        fillInlongId(currentRecord, dimensions);
-        dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
-        dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, topic);
-        long msgTime = currentRecord.getRawLogTime();
-        long auditFormatTime = msgTime - msgTime % 
CommonPropertiesHolder.getAuditFormatInterval();
-        dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, 
String.valueOf(auditFormatTime));
+        Map<String, String> dimensions = this.getDimensions(currentRecord, 
topic);
         SortMetricItem metricItem = 
this.getMetricItemSet().findMetricItem(dimensions);
         long count = 1;
         long size = currentRecord.getBody().length;
@@ -216,16 +214,7 @@ public class KafkaFederationSinkContext extends 
SinkContext {
      * @param sendTime
      */
     public void addSendResultMetric(ProfileEvent currentRecord, String topic, 
boolean result, long sendTime) {
-        Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
-        dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
-        // metric
-        fillInlongId(currentRecord, dimensions);
-        dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
-        dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, topic);
-        long msgTime = currentRecord.getRawLogTime();
-        long auditFormatTime = msgTime - msgTime % 
CommonPropertiesHolder.getAuditFormatInterval();
-        dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, 
String.valueOf(auditFormatTime));
+        Map<String, String> dimensions = this.getDimensions(currentRecord, 
topic);
         SortMetricItem metricItem = 
this.getMetricItemSet().findMetricItem(dimensions);
         long count = 1;
         long size = currentRecord.getBody().length;
@@ -248,6 +237,27 @@ public class KafkaFederationSinkContext extends 
SinkContext {
         }
     }
 
+    public void addSendFilterMetric(ProfileEvent currentRecord, String bid) {
+        Map<String, String> dimensions = this.getDimensions(currentRecord, 
bid);
+        SortMetricItem metricItem = 
this.getMetricItemSet().findMetricItem(dimensions);
+        metricItem.sendFilterCount.incrementAndGet();
+        metricItem.sendFilterSize.addAndGet(currentRecord.getBody().length);
+    }
+
+    private Map<String, String> getDimensions(ProfileEvent currentRecord, 
String bid) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+        // metric
+        fillInlongId(currentRecord, dimensions);
+        dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+        dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, bid);
+        long msgTime = currentRecord.getRawLogTime();
+        long auditFormatTime = msgTime - msgTime % 
CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, 
String.valueOf(auditFormatTime));
+        return dimensions;
+    }
+
     /**
      * create IEvent2ProducerRecordHandler
      *
@@ -270,4 +280,91 @@ public class KafkaFederationSinkContext extends 
SinkContext {
         }
         return null;
     }
+
+    public TransformProcessor<String, ?> getTransformProcessor(String 
dataFlowId) {
+        Long threadId = Thread.currentThread().getId();
+        Map<String, TransformProcessor<String, ?>> transformProcessors = 
this.transformMap.get(threadId);
+        if (transformProcessors == null) {
+            transformProcessors = reloadTransform(taskConfig);
+            this.transformMap.put(threadId, transformProcessors);
+        }
+        return transformProcessors.get(dataFlowId);
+    }
+
+    private Map<String, TransformProcessor<String, ?>> 
reloadTransform(TaskConfig taskConfig) {
+        ImmutableMap.Builder<String, TransformProcessor<String, ?>> builder = 
new ImmutableMap.Builder<>();
+
+        taskConfig.getClusterTagConfigs()
+                .stream()
+                .map(ClusterTagConfig::getDataFlowConfigs)
+                .flatMap(Collection::stream)
+                .forEach(flow -> {
+                    if (StringUtils.isEmpty(flow.getTransformSql())) {
+                        return;
+                    }
+                    TransformProcessor<String, ?> transformProcessor = 
createTransform(flow);
+                    if (transformProcessor == null) {
+                        return;
+                    }
+                    builder.put(flow.getDataflowId(),
+                            transformProcessor);
+                });
+
+        return builder.build();
+    }
+
+    private TransformProcessor<String, ?> createTransform(DataFlowConfig 
dataFlowConfig) {
+        try {
+            LOG.info("try to create transform:{}", dataFlowConfig.toString());
+            return TransformProcessor.create(
+                    createTransformConfig(dataFlowConfig),
+                    createSourceDecoder(dataFlowConfig.getSourceConfig()),
+                    createSinkEncoder(dataFlowConfig.getSinkConfig()));
+        } catch (Exception e) {
+            LOG.error("failed to reload transform of dataflow={}, ex={}", 
dataFlowConfig.getDataflowId(),
+                    e.getMessage(), e);
+            return null;
+        }
+    }
+
+    private SinkEncoder<?> createSinkEncoder(SinkConfig sinkConfig) {
+        if (!(sinkConfig instanceof KafkaSinkConfig)) {
+            throw new IllegalArgumentException("sinkInfo must be an instance 
of KafkaSinkConfig");
+        }
+        KafkaSinkConfig sSinkConfig = (KafkaSinkConfig) sinkConfig;
+        List<FieldInfo> fieldInfos = sSinkConfig.getFieldConfigs()
+                .stream()
+                .map(config -> new FieldInfo(config.getName(), 
deriveTypeConverter(config.getFormatInfo())))
+                .collect(Collectors.toList());
+
+        if (StringUtils.equalsIgnoreCase(KafkaSinkConfig.MESSAGE_TYPE_CSV, 
sSinkConfig.getMessageType())) {
+            Character delimiter = sSinkConfig.getDelimiter();
+            if (delimiter == null) {
+                delimiter = KafkaSinkConfig.CSV_DEFAULT_DELIMITER;
+            }
+            CsvSinkInfo sinkInfo = new 
CsvSinkInfo(sinkConfig.getEncodingType(), delimiter, 
sSinkConfig.getEscapeChar(),
+                    fieldInfos);
+            return SinkEncoderFactory.createCsvEncoder(sinkInfo);
+        } else if 
(StringUtils.equalsIgnoreCase(KafkaSinkConfig.MESSAGE_TYPE_KV, 
sSinkConfig.getMessageType())) {
+            Character entrySplitter = sSinkConfig.getEntrySplitter();
+            if (entrySplitter == null) {
+                entrySplitter = KafkaSinkConfig.KV_DEFAULT_ENTRYSPLITTER;
+            }
+            Character kvSplitter = sSinkConfig.getKvSplitter();
+            if (kvSplitter == null) {
+                kvSplitter = KafkaSinkConfig.KV_DEFAULT_KVSPLITTER;
+            }
+            KvSinkInfo sinkInfo = new KvSinkInfo(sinkConfig.getEncodingType(), 
fieldInfos);
+            sinkInfo.setEntryDelimiter(entrySplitter);
+            sinkInfo.setKvDelimiter(kvSplitter);
+            return SinkEncoderFactory.createKvEncoder(sinkInfo);
+        } else if 
(StringUtils.equalsIgnoreCase(KafkaSinkConfig.MESSAGE_TYPE_JSON, 
sSinkConfig.getMessageType())) {
+            MapSinkInfo sinkInfo = new 
MapSinkInfo(sinkConfig.getEncodingType(), fieldInfos);
+            return SinkEncoderFactory.createMapEncoder(sinkInfo);
+        } else {
+            CsvSinkInfo sinkInfo = new 
CsvSinkInfo(sinkConfig.getEncodingType(), KafkaSinkConfig.CSV_DEFAULT_DELIMITER,
+                    null, fieldInfos);
+            return SinkEncoderFactory.createCsvEncoder(sinkInfo);
+        }
+    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
index a481743c66..e502afd42b 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
@@ -30,6 +30,9 @@ import org.apache.flume.Transaction;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.slf4j.Logger;
 
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -57,8 +60,7 @@ public class KafkaFederationWorker extends Thread {
         super();
         this.workerName = sinkName + "-" + workerIndex;
         this.context = Preconditions.checkNotNull(context);
-        this.producerFederation =
-                new KafkaProducerFederation(String.valueOf(workerIndex), 
this.context);
+        this.producerFederation = new 
KafkaProducerFederation(String.valueOf(workerIndex), this.context);
         this.status = LifecycleState.IDLE;
         this.dimensions.put(SortMetricItem.KEY_CLUSTER_ID, 
this.context.getClusterId());
         this.dimensions.put(SortMetricItem.KEY_TASK_NAME, 
this.context.getTaskName());
@@ -86,52 +88,73 @@ public class KafkaFederationWorker extends Thread {
     public void run() {
         LOG.info("worker {} start to run, the state is {}", this.workerName, 
status.name());
         while (status != LifecycleState.STOP) {
-            Transaction tx = null;
+            KafkaTransaction ktx = KafkaTransaction.builder().dataFlowIds(new 
HashSet<>()).build();
             try {
                 Channel channel = context.getChannel();
-                tx = channel.getTransaction();
+                Transaction tx = channel.getTransaction();
                 tx.begin();
+                ktx.setTx(tx);
                 Event rowEvent = channel.take();
 
                 // if event is null, close tx and sleep for a while.
                 if (rowEvent == null) {
-                    tx.commit();
-                    tx.close();
+                    ktx.negativeAck();
                     sleepOneInterval();
                     continue;
                 }
+                // if event is not ProfileEvent
                 if (!(rowEvent instanceof ProfileEvent)) {
-                    tx.commit();
-                    tx.close();
+                    ktx.negativeAck();
                     LOG.error("The type of row event is not compatible with 
ProfileEvent");
                     continue;
                 }
 
                 ProfileEvent profileEvent = (ProfileEvent) rowEvent;
-                String topic = this.context.getTopic(profileEvent.getUid());
-                if (StringUtils.isBlank(topic)) {
+                ktx.setProfileEvent(profileEvent);
+                // if there is not config
+                List<KafkaIdConfig> idConfigs = 
this.context.getIdConfig(profileEvent.getUid());
+                if (idConfigs == null) {
                     this.context.addSendResultMetric(profileEvent, 
profileEvent.getUid(),
                             false, System.currentTimeMillis());
-                    profileEvent.ack();
-                    tx.commit();
-                    tx.close();
+                    ktx.negativeAck();
+                    continue;
                 }
-                profileEvent.getHeaders().put(Constants.TOPIC, topic);
-                this.context.addSendMetric(profileEvent, topic);
-                this.producerFederation.send(profileEvent, tx);
+                // if there is multi-output
+                idConfigs.forEach(v -> 
ktx.getDataFlowIds().add(v.getDataFlowId()));
+                this.processEvent(ktx, idConfigs, profileEvent);
             } catch (Exception e) {
                 LOG.error(e.getMessage(), e);
-                if (tx != null) {
-                    tx.rollback();
-                    tx.close();
+                if (ktx.getProfileEvent() != null) {
+                    this.context.addSendResultMetric(ktx.getProfileEvent(),
+                            ktx.getProfileEvent().getUid(),
+                            false, System.currentTimeMillis());
+                } else {
+                    SortMetricItem metricItem = 
this.context.getMetricItemSet().findMetricItem(dimensions);
+                    metricItem.sendFailCount.incrementAndGet();
                 }
-                // metric
-                SortMetricItem metricItem =
-                        
this.context.getMetricItemSet().findMetricItem(dimensions);
-                metricItem.sendFailCount.incrementAndGet();
-                sleepOneInterval();
+                ktx.negativeAck();
+            }
+        }
+    }
+
+    private boolean processEvent(KafkaTransaction ktx, List<KafkaIdConfig> 
idConfigs, ProfileEvent profileEvent)
+            throws IOException {
+        for (KafkaIdConfig idConfig : idConfigs) {
+            String topic = idConfig.getTopic();
+            if (StringUtils.isBlank(topic)) {
+                LOG.error("can not find the topic,dataFlowId:{},uid:{}", 
idConfig.getDataFlowId(), idConfig.getUid());
+                this.context.addSendResultMetric(profileEvent, 
idConfig.getDataFlowId(),
+                        false, System.currentTimeMillis());
+                // ktx.negativeAck();
+                // return false;
+                ktx.ack(idConfig.getDataFlowId());
+            } else {
+                profileEvent.getHeaders().put(Constants.TOPIC, topic);
+                this.context.addSendMetric(profileEvent, 
idConfig.getDataFlowId());
+                this.producerFederation.send(ktx, profileEvent, idConfig);
             }
         }
+        return true;
     }
 
     /** sleepOneInterval */
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
index 7ded1917f9..efe2018d21 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
@@ -52,6 +52,7 @@ public class KafkaIdConfig extends IdConfig {
     private String topic;
     private DataTypeEnum dataType;
     private DataFlowConfig dataFlowConfig;
+    private String dataFlowId;
 
     public KafkaIdConfig(Map<String, String> idParam) {
         this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
@@ -61,6 +62,7 @@ public class KafkaIdConfig extends IdConfig {
         this.topic = idParam.getOrDefault(Constants.TOPIC, uid);
         this.dataType = DataTypeEnum
                 .convert(idParam.getOrDefault(KafkaIdConfig.KEY_DATA_TYPE, 
DataTypeEnum.TEXT.getType()));
+        this.dataFlowId = this.uid;
     }
 
     public static KafkaIdConfig create(DataFlowConfig dataFlowConfig) {
@@ -86,6 +88,7 @@ public class KafkaIdConfig extends IdConfig {
                 .dataType(dataType)
                 .separator(separator)
                 .dataFlowConfig(dataFlowConfig)
+                .dataFlowId(dataFlowConfig.getDataflowId())
                 .build();
     }
 
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index d10d32d02e..b8a223d0a8 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -24,7 +24,6 @@ import 
org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import com.google.common.base.Preconditions;
-import org.apache.flume.Transaction;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -35,12 +34,12 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.datanucleus.util.StringUtils;
 import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.time.Duration;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Properties;
 
 /**
@@ -202,66 +201,60 @@ public class KafkaProducerCluster implements 
LifecycleAware {
      * @return boolean
      * @throws IOException
      */
-    public boolean send(ProfileEvent profileEvent, Transaction tx) throws 
IOException {
-        String topic = sinkContext.getTopic(profileEvent.getUid());
-        ProducerRecord<String, byte[]> record = handler.parse(sinkContext, 
profileEvent);
+    public boolean send(KafkaTransaction ktx, ProfileEvent profileEvent, 
KafkaIdConfig idConfig) throws IOException {
+        List<ProducerRecord<String, byte[]>> records = 
handler.parse(sinkContext, profileEvent, idConfig);
         long sendTime = System.currentTimeMillis();
         // check
-        if (record == null || StringUtils.isEmpty(topic)) {
-            tx.commit();
-            profileEvent.ack();
-            tx.close();
-            sinkContext.addSendResultMetric(profileEvent, topic, false, 
sendTime);
+        if (records == null || records.size() == 0) {
+            sinkContext.addSendFilterMetric(profileEvent, 
idConfig.getDataFlowId());
+            ktx.ack(idConfig.getDataFlowId());
             return true;
         }
         try {
-            producer.send(record,
-                    (metadata, ex) -> {
-                        if (ex == null) {
-                            tx.commit();
-                            sinkContext.addSendResultMetric(profileEvent, 
topic, true, sendTime);
-                            profileEvent.ack();
-                        } else {
-
-                            if (ex instanceof RecordTooLargeException) {
-                                // for the message bigger than 
configuredMaxPayloadSize, just discard it;
-                                // otherwise, retry and wait for the server 
side changes the limitation
-                                if (record.value().length > 
configuredMaxPayloadSize) {
-                                    tx.commit();
-                                    profileEvent.ack();
+            for (ProducerRecord<String, byte[]> record : records) {
+                producer.send(record,
+                        (metadata, ex) -> {
+                            if (ex == null) {
+                                sinkContext.addSendResultMetric(profileEvent, 
idConfig.getDataFlowId(), true, sendTime);
+                                ktx.ack(idConfig.getDataFlowId());
+                            } else {
+                                LOG.error("send failed, 
uid:{},dataFlowId:{},topic:{},error:{}",
+                                        idConfig.getUid(),
+                                        idConfig.getDataFlowId(),
+                                        idConfig.getTopic(),
+                                        ex.getMessage(),
+                                        ex);
+                                sinkContext.addSendResultMetric(profileEvent, 
idConfig.getDataFlowId(), false,
+                                        sendTime);
+                                if (ex instanceof RecordTooLargeException) {
+                                    // for the message bigger than 
configuredMaxPayloadSize, just discard it;
+                                    // otherwise, retry and wait for the 
server side changes the limitation
+                                    if (record.value().length > 
configuredMaxPayloadSize) {
+                                        ktx.ack(idConfig.getDataFlowId());
+                                    } else {
+                                        ktx.negativeAck();
+                                    }
+                                } else if (ex instanceof 
UnknownTopicOrPartitionException
+                                        || !(ex instanceof 
RetriableException)) {
+                                    // for non-retriable exception, just 
discard it
+                                    ktx.ack(idConfig.getDataFlowId());
                                 } else {
-                                    this.exceptionProcess(profileEvent, tx);
+                                    ktx.negativeAck();
                                 }
-                            } else if (ex instanceof 
UnknownTopicOrPartitionException
-                                    || !(ex instanceof RetriableException)) {
-                                // for non-retriable exception, just discard it
-                                tx.commit();
-                                profileEvent.ack();
-                            } else {
-                                this.exceptionProcess(profileEvent, tx);
                             }
-                            LOG.error(String.format("send failed, topic is 
%s", topic), ex);
-                            sinkContext.addSendResultMetric(profileEvent, 
topic, false, sendTime);
-                        }
-                        tx.close();
-                    });
+                        });
+            }
             return true;
         } catch (Exception e) {
-            this.exceptionProcess(profileEvent, tx);
-            tx.close();
-            LOG.error(e.getMessage(), e);
-            sinkContext.addSendResultMetric(profileEvent, topic, false, 
sendTime);
+            sinkContext.addSendResultMetric(profileEvent, 
idConfig.getDataFlowId(), false, sendTime);
+            LOG.error("send failed, uid:{},dataFlowId:{},topic:{},error:{}",
+                    idConfig.getUid(),
+                    idConfig.getDataFlowId(),
+                    idConfig.getTopic(),
+                    e.getMessage(),
+                    e);
+            ktx.negativeAck();
             return false;
         }
     }
-
-    private void exceptionProcess(ProfileEvent profileEvent, Transaction tx) {
-        profileEvent.incrementSendedTime();
-        if (profileEvent.getSendedTime() <= 
CommonPropertiesHolder.getMaxSendFailTimes()) {
-            tx.rollback();
-        } else {
-            tx.commit();
-            profileEvent.negativeAck();
-        }
-    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
index 219519b907..eec50c9eab 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
@@ -24,7 +24,6 @@ import 
org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import com.google.common.base.Preconditions;
-import org.apache.flume.Transaction;
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -103,8 +102,8 @@ public class KafkaProducerFederation implements Runnable {
                 return;
             }
             this.cacheClusterConfig = context.getCacheClusterConfig();
-            KafkaProducerCluster updateCluster =
-                    new KafkaProducerCluster(workerName, cacheClusterConfig, 
nodeConfig, context);
+            KafkaProducerCluster updateCluster = new 
KafkaProducerCluster(workerName, cacheClusterConfig, nodeConfig,
+                    context);
             updateCluster.start();
             this.deleteCluster = cluster;
             this.cluster = updateCluster;
@@ -120,8 +119,8 @@ public class KafkaProducerFederation implements Runnable {
                 return;
             }
             this.nodeConfig = context.getNodeConfig();
-            KafkaProducerCluster updateCluster =
-                    new KafkaProducerCluster(workerName, cacheClusterConfig, 
nodeConfig, context);
+            KafkaProducerCluster updateCluster = new 
KafkaProducerCluster(workerName, cacheClusterConfig, nodeConfig,
+                    context);
             updateCluster.start();
             this.deleteCluster = cluster;
             this.cluster = updateCluster;
@@ -130,8 +129,8 @@ public class KafkaProducerFederation implements Runnable {
         }
     }
 
-    public boolean send(ProfileEvent profileEvent, Transaction tx) throws 
IOException {
-        return cluster.send(profileEvent, tx);
+    public boolean send(KafkaTransaction ktx, ProfileEvent profileEvent, 
KafkaIdConfig idConfig) throws IOException {
+        return cluster.send(ktx, profileEvent, idConfig);
     }
 
     /** Init ScheduledExecutorService with fix reload rate {@link 
#reloadInterval}. */
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaTransaction.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaTransaction.java
new file mode 100644
index 0000000000..9b4060375e
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaTransaction.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.kafka;
+
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+
+import lombok.Builder;
+import lombok.Data;
+import org.apache.flume.Transaction;
+
+import java.util.Set;
+
+/**
+ * KafkaTransaction
+ * 
+ */
+@Data
+@Builder
+public class KafkaTransaction {
+
+    private Transaction tx;
+    private ProfileEvent profileEvent;
+    private Set<String> dataFlowIds;
+
+    /**
+     * ack
+     */
+    public synchronized void ack(String dataFlowId) {
+        if (dataFlowIds != null) {
+            dataFlowIds.remove(dataFlowId);
+            if (dataFlowIds.size() > 0) {
+                return;
+            }
+            if (profileEvent != null) {
+                profileEvent.ack();
+            }
+            if (tx != null) {
+                tx.commit();
+                tx.close();
+            }
+        }
+    }
+
+    /**
+     * negativeAck
+     */
+    public synchronized void negativeAck() {
+        if (profileEvent != null) {
+            profileEvent.negativeAck();
+        }
+        if (tx != null) {
+            tx.commit();
+            tx.close();
+        }
+        dataFlowIds = null;
+        profileEvent = null;
+        tx = null;
+    }
+}


Reply via email to