This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 435aa3c5ab [INLONG-11883][Sort] ​​Sort CLS supports format conversion 
and data filtering via Transform Functions (#11884)
435aa3c5ab is described below

commit 435aa3c5aba4b6c326d88d205087299e95e22086
Author: ChunLiang Lu <[email protected]>
AuthorDate: Thu Jun 12 17:15:01 2025 +0800

    [INLONG-11883][Sort] ​​Sort CLS supports format conversion and data 
filtering via Transform Functions (#11884)
    
    * [INLONG-11883][Sort] ​​Sort CLS supports format conversion and data 
filtering via Transform Functions
    
    * fix code style problem
---
 .../sort/standalone/metrics/SortMetricItem.java    |  7 ++
 .../sort-standalone-source/pom.xml                 |  5 ++
 .../sort/standalone/sink/cls/ClsChannelWorker.java | 13 ++-
 .../sort/standalone/sink/cls/ClsIdConfig.java      |  3 +
 .../sort/standalone/sink/cls/ClsSinkContext.java   | 94 +++++++++++++++++++++-
 .../sink/cls/DefaultEvent2LogItemHandler.java      | 76 ++++++++++++++++-
 .../standalone/sink/cls/IEvent2LogItemHandler.java |  7 ++
 7 files changed, 201 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
index 0daf242f77..1d22eb29ef 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
@@ -63,6 +63,9 @@ public class SortMetricItem extends MetricItem {
     public static final String M_NODE_DURATION = "nodeDuration";
     public static final String M_WHOLE_DURATION = "wholeDuration";
 
+    public static final String M_READ_FILTER_COUNT = "readFilterCount";
+    public static final String M_READ_FILTER_SIZE = "readFilterSize";
+
     @Dimension
     public String clusterId;
     @Dimension
@@ -110,6 +113,10 @@ public class SortMetricItem extends MetricItem {
     @CountMetric
     // sinkCallbackTime - eventCreateTime(milliseconds)
     public AtomicLong wholeDuration = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendFilterCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendFilterSize = new AtomicLong(0);
 
     /**
      * fillInlongId
diff --git a/inlong-sort-standalone/sort-standalone-source/pom.xml 
b/inlong-sort-standalone/sort-standalone-source/pom.xml
index c4461ffc5c..63c4259a9c 100644
--- a/inlong-sort-standalone/sort-standalone-source/pom.xml
+++ b/inlong-sort-standalone/sort-standalone-source/pom.xml
@@ -50,6 +50,11 @@
             <artifactId>audit-sdk</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>ru.yandex.clickhouse</groupId>
             <artifactId>clickhouse-jdbc</artifactId>
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
index 8fb596befe..5b76dd5d2d 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.standalone.sink.cls;
 
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
@@ -31,6 +32,7 @@ import org.apache.flume.lifecycle.LifecycleState;
 import org.slf4j.Logger;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Cls channel worker.
@@ -136,8 +138,17 @@ public class ClsChannelWorker extends Thread {
             return;
         }
         event.getHeaders().put(ClsSinkContext.KEY_TOPIC_ID, 
idConfig.getTopicId());
+        context.addSendMetric(event, idConfig.getTopicId());
         AsyncProducerClient client = context.getClient(idConfig.getSecretId());
-        List<LogItem> record = handler.parse(context, event);
+        // transform
+        TransformProcessor<String, Map<String, Object>> processor =
+                context.getTransformProcessor(event.getUid());
+        List<LogItem> record = null;
+        if (processor == null) {
+            record = handler.parse(context, event);
+        } else {
+            record = handler.parse(context, event, processor);
+        }
         ClsCallback callback = new ClsCallback(tx, context, event);
         client.putLogs(idConfig.getTopicId(), record, callback);
     }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
index cbf402a83f..a10630f5b5 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.standalone.sink.cls;
 
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.sink.ClsSinkConfig;
 import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
@@ -43,6 +44,7 @@ import java.util.stream.Collectors;
 public class ClsIdConfig extends IdConfig {
 
     private String separator = "|";
+    private DataTypeConfig dataTypeConfig;
     private String endpoint;
     private String secretId;
     private String secretKey;
@@ -63,6 +65,7 @@ public class ClsIdConfig extends IdConfig {
                 .contentOffset(sinkConfig.getContentOffset())
                 .fieldOffset(sinkConfig.getFieldOffset())
                 .separator(sinkConfig.getSeparator())
+                
.dataTypeConfig(dataFlowConfig.getSourceConfig().getDataTypeConfig())
                 .fieldList(fields)
                 .topicId(sinkConfig.getTopicId())
                 .endpoint(nodeConfig.getEndpoint())
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index f53cc2fa91..12ce29fbe5 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -19,8 +19,16 @@ package org.apache.inlong.sort.standalone.sink.cls;
 
 import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
 import org.apache.inlong.common.pojo.sort.TaskConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.ClsSinkConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
 import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
 import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.sdk.transform.encode.MapSinkEncoder;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
@@ -36,10 +44,12 @@ import 
org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
 import com.tencentcloudapi.cls.producer.AsyncProducerClient;
 import com.tencentcloudapi.cls.producer.AsyncProducerConfig;
 import com.tencentcloudapi.cls.producer.errors.ProducerException;
 import com.tencentcloudapi.cls.producer.util.NetworkUtils;
+import lombok.Getter;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -81,6 +91,10 @@ public class ClsSinkContext extends SinkContext {
     private ClsNodeConfig clsNodeConfig;
     private ObjectMapper objectMapper;
 
+    // Map<Uid, TransformProcessor<String sourceType, Map<String, Object> 
sinkType>
+    @Getter
+    protected Map<String, TransformProcessor<String, Map<String, Object>>> 
transformMap;
+
     public ClsSinkContext(String sinkName, Context context, Channel channel) {
         super(sinkName, context, channel);
         this.clientMap = new ConcurrentHashMap<>();
@@ -122,7 +136,14 @@ public class ClsSinkContext extends SinkContext {
             Map<String, ClsIdConfig> fromTaskConfig = 
reloadIdParamsFromTaskConfig(taskConfig, clsNodeConfig);
             Map<String, ClsIdConfig> fromSortTaskConfig = 
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
             SortConfigMetricReporter.reportClusterDiff(clusterId, taskName, 
fromTaskConfig, fromSortTaskConfig);
-            idConfigMap = unifiedConfiguration ? fromTaskConfig : 
fromSortTaskConfig;
+            Map<String, TransformProcessor<String, Map<String, Object>>> 
transformProcessor =
+                    reloadTransform(taskConfig);
+            if (unifiedConfiguration) {
+                idConfigMap = fromTaskConfig;
+                transformMap = transformProcessor;
+            } else {
+                idConfigMap = fromSortTaskConfig;
+            }
             this.reloadClients(idConfigMap);
             this.reloadHandler();
         } catch (Exception e) {
@@ -216,6 +237,18 @@ public class ClsSinkContext extends SinkContext {
         }
         deletingClients.add(clientMap.remove(secretId));
     }
+    /**
+     * addSendMetric
+     *
+     * @param currentRecord
+     * @param topic
+     */
+    public void addSendMetric(ProfileEvent currentRecord, String topic) {
+        Map<String, String> dimensions = this.getDimensions(currentRecord, 
topic);
+        SortMetricItem metricItem = 
this.getMetricItemSet().findMetricItem(dimensions);
+        metricItem.sendCount.incrementAndGet();
+        metricItem.sendSize.addAndGet(currentRecord.getBody().length);
+    }
 
     public void addSendResultMetric(ProfileEvent currentRecord, String bid, 
boolean result, long sendTime) {
         Map<String, String> dimensions = this.getDimensions(currentRecord, 
bid);
@@ -239,6 +272,13 @@ public class ClsSinkContext extends SinkContext {
         }
     }
 
+    public void addSendFilterMetric(ProfileEvent currentRecord, String bid) {
+        Map<String, String> dimensions = this.getDimensions(currentRecord, 
bid);
+        SortMetricItem metricItem = 
this.getMetricItemSet().findMetricItem(dimensions);
+        metricItem.sendFilterCount.incrementAndGet();
+        metricItem.sendFilterSize.addAndGet(currentRecord.getBody().length);
+    }
+
     private Map<String, String> getDimensions(ProfileEvent currentRecord, 
String bid) {
         Map<String, String> dimensions = new HashMap<>();
         dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
@@ -268,4 +308,56 @@ public class ClsSinkContext extends SinkContext {
     public AsyncProducerClient getClient(String secretId) {
         return clientMap.get(secretId);
     }
+
+    public TransformProcessor<String, Map<String, Object>> 
getTransformProcessor(String uid) {
+        return this.transformMap.get(uid);
+    }
+
+    private Map<String, TransformProcessor<String, Map<String, Object>>> 
reloadTransform(TaskConfig taskConfig) {
+        ImmutableMap.Builder<String, TransformProcessor<String, Map<String, 
Object>>> builder =
+                new ImmutableMap.Builder<>();
+
+        taskConfig.getClusterTagConfigs()
+                .stream()
+                .map(ClusterTagConfig::getDataFlowConfigs)
+                .flatMap(Collection::stream)
+                .forEach(flow -> {
+                    TransformProcessor<String, Map<String, Object>> 
transformProcessor =
+                            createTransform(flow);
+                    if (transformProcessor == null) {
+                        return;
+                    }
+                    builder.put(InlongId.generateUid(flow.getInlongGroupId(), 
flow.getInlongStreamId()),
+                            transformProcessor);
+                });
+
+        return builder.build();
+    }
+
+    private TransformProcessor<String, Map<String, Object>> 
createTransform(DataFlowConfig dataFlowConfig) {
+        try {
+            return TransformProcessor.create(
+                    createTransformConfig(dataFlowConfig),
+                    createSourceDecoder(dataFlowConfig.getSourceConfig()),
+                    createClsSinkEncoder(dataFlowConfig.getSinkConfig()));
+        } catch (Exception e) {
+            LOG.error("failed to reload transform of dataflow={}, ex={}", 
dataFlowConfig.getDataflowId(),
+                    e.getMessage());
+            return null;
+        }
+    }
+
+    private MapSinkEncoder createClsSinkEncoder(SinkConfig sinkConfig) {
+        if (!(sinkConfig instanceof ClsSinkConfig)) {
+            throw new IllegalArgumentException("sinkInfo must be an instance 
of ClsSinkConfig");
+        }
+        ClsSinkConfig clsSinkConfig = (ClsSinkConfig) sinkConfig;
+        List<FieldInfo> fieldInfos = clsSinkConfig.getFieldConfigs()
+                .stream()
+                .map(config -> new FieldInfo(config.getName(), 
deriveTypeConverter(config.getFormatInfo())))
+                .collect(Collectors.toList());
+
+        MapSinkInfo sinkInfo = new MapSinkInfo(sinkConfig.getEncodingType(), 
fieldInfos);
+        return SinkEncoderFactory.createMapEncoder(sinkInfo);
+    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
index 7b9e6e6fae..433e3c24dd 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
@@ -17,6 +17,11 @@
 
 package org.apache.inlong.sort.standalone.sink.cls;
 
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.inlong.sort.formats.util.StringUtils;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 import org.apache.inlong.sort.standalone.utils.UnescapeHelper;
@@ -27,9 +32,11 @@ import org.slf4j.Logger;
 import java.nio.charset.Charset;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Default event to logItem handler.
@@ -58,8 +65,30 @@ public class DefaultEvent2LogItemHandler implements 
IEvent2LogItemHandler {
 
         // prepare values
         String stringValues = this.getStringValues(event, idConfig);
-        char delimiter = idConfig.getSeparator().charAt(0);
-        List<String> listValues = UnescapeHelper.toFiledList(stringValues, 
delimiter);
+        List<String> listValues = null;
+        if (idConfig.getDataTypeConfig() == null) {
+            char delimiter = idConfig.getSeparator().charAt(0);
+            listValues = UnescapeHelper.toFiledList(stringValues, delimiter);
+        } else {
+            DataTypeConfig dataTypeConfig = idConfig.getDataTypeConfig();
+            if (dataTypeConfig instanceof CsvConfig) {
+                CsvConfig csvConfig = (CsvConfig) dataTypeConfig;
+                String[] csvArray = StringUtils.splitCsv(stringValues, 
csvConfig.getDelimiter(),
+                        csvConfig.getEscapeChar(), null);
+                listValues = Arrays.asList(csvArray);
+            } else if (dataTypeConfig instanceof KvConfig) {
+                KvConfig kvConfig = (KvConfig) dataTypeConfig;
+                Map<String, String> kvMap = StringUtils.splitKv(stringValues, 
kvConfig.getEntrySplitter(),
+                        kvConfig.getKvSplitter(), kvConfig.getEscapeChar(), 
null);
+                List<String> listKeys = idConfig.getFieldList();
+                final List<String> finalListValues = new 
ArrayList<>(listKeys.size());
+                listKeys.forEach(v -> finalListValues.add(kvMap.get(v)));
+                listValues = finalListValues;
+            } else {
+                char delimiter = idConfig.getSeparator().charAt(0);
+                listValues = UnescapeHelper.toFiledList(stringValues, 
delimiter);
+            }
+        }
         listValues.forEach(value -> this.truncateSingleValue(value, 
context.getKeywordMaxLength()));
         // prepare keys
         List<String> listKeys = idConfig.getFieldList();
@@ -122,4 +151,47 @@ public class DefaultEvent2LogItemHandler implements 
IEvent2LogItemHandler {
         return "";
     }
 
+    @Override
+    public List<LogItem> parse(
+            ClsSinkContext context,
+            ProfileEvent event,
+            TransformProcessor<String, Map<String, Object>> processor) {
+        String uid = event.getUid();
+        ClsIdConfig idConfig = context.getIdConfig(uid);
+        if (idConfig == null) {
+            LOG.error("There is no cls id config for uid {}, discard it", uid);
+            context.addSendResultMetric(event, context.getTaskName(), false, 
System.currentTimeMillis());
+            return null;
+        }
+
+        // prepare values
+        String stringValues = this.getStringValues(event, idConfig);
+        Map<String, Object> extParams = new ConcurrentHashMap<>();
+        event.getHeaders().forEach((k, v) -> extParams.put(k, v));
+        List<Map<String, Object>> resultList = 
processor.transform(stringValues, extParams);
+        if (resultList == null) {
+            context.addSendFilterMetric(event, context.getTaskName());
+            return null;
+        }
+        List<LogItem> itemList = new ArrayList<>();
+        for (Map<String, Object> result : resultList) {
+            // prepare keys
+            List<String> listKeys = idConfig.getFieldList();
+            final List<String> listValues = new ArrayList<>(listKeys.size());
+            listKeys.forEach(v -> 
listValues.add(String.valueOf(result.get(v))));
+            listValues.forEach(value -> this.truncateSingleValue(value, 
context.getKeywordMaxLength()));
+            // prepare offset
+            int fieldOffset = idConfig.getFieldOffset();
+            // convert to LogItem format
+            LogItem item = this.parseToLogItem(listKeys, listValues, 
event.getRawLogTime(), fieldOffset);
+            // add ftime
+            String ftime = dateFormat.format(new Date(event.getRawLogTime()));
+            item.PushBack("ftime", ftime);
+            // add extinfo
+            String extinfo = this.getExtInfo(event);
+            item.PushBack("extinfo", extinfo);
+            itemList.add(item);
+        }
+        return itemList;
+    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/IEvent2LogItemHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/IEvent2LogItemHandler.java
index e419f61e69..eb96bef5ec 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/IEvent2LogItemHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/IEvent2LogItemHandler.java
@@ -17,11 +17,13 @@
 
 package org.apache.inlong.sort.standalone.sink.cls;
 
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 
 import com.tencentcloudapi.cls.producer.common.LogItem;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Handler to pares profile event to CLS {@literal List<LogItem>} format.
@@ -36,4 +38,9 @@ public interface IEvent2LogItemHandler {
      * @return {@literal List<LogItem>}
      */
     List<LogItem> parse(ClsSinkContext context, ProfileEvent event);
+
+    List<LogItem> parse(
+            ClsSinkContext context,
+            ProfileEvent event,
+            TransformProcessor<String, Map<String, Object>> processor);
 }

Reply via email to