This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 435aa3c5ab [INLONG-11883][Sort] Sort CLS supports format conversion
and data filtering via Transform Functions (#11884)
435aa3c5ab is described below
commit 435aa3c5aba4b6c326d88d205087299e95e22086
Author: ChunLiang Lu <[email protected]>
AuthorDate: Thu Jun 12 17:15:01 2025 +0800
[INLONG-11883][Sort] Sort CLS supports format conversion and data
filtering via Transform Functions (#11884)
* [INLONG-11883][Sort] Sort CLS supports format conversion and data
filtering via Transform Functions
* fix code style problem
---
.../sort/standalone/metrics/SortMetricItem.java | 7 ++
.../sort-standalone-source/pom.xml | 5 ++
.../sort/standalone/sink/cls/ClsChannelWorker.java | 13 ++-
.../sort/standalone/sink/cls/ClsIdConfig.java | 3 +
.../sort/standalone/sink/cls/ClsSinkContext.java | 94 +++++++++++++++++++++-
.../sink/cls/DefaultEvent2LogItemHandler.java | 76 ++++++++++++++++-
.../standalone/sink/cls/IEvent2LogItemHandler.java | 7 ++
7 files changed, 201 insertions(+), 4 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
index 0daf242f77..1d22eb29ef 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
@@ -63,6 +63,9 @@ public class SortMetricItem extends MetricItem {
public static final String M_NODE_DURATION = "nodeDuration";
public static final String M_WHOLE_DURATION = "wholeDuration";
+ public static final String M_READ_FILTER_COUNT = "readFilterCount";
+ public static final String M_READ_FILTER_SIZE = "readFilterSize";
+
@Dimension
public String clusterId;
@Dimension
@@ -110,6 +113,10 @@ public class SortMetricItem extends MetricItem {
@CountMetric
// sinkCallbackTime - eventCreateTime(milliseconds)
public AtomicLong wholeDuration = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendFilterCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendFilterSize = new AtomicLong(0);
/**
* fillInlongId
diff --git a/inlong-sort-standalone/sort-standalone-source/pom.xml
b/inlong-sort-standalone/sort-standalone-source/pom.xml
index c4461ffc5c..63c4259a9c 100644
--- a/inlong-sort-standalone/sort-standalone-source/pom.xml
+++ b/inlong-sort-standalone/sort-standalone-source/pom.xml
@@ -50,6 +50,11 @@
<artifactId>audit-sdk</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
index 8fb596befe..5b76dd5d2d 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.standalone.sink.cls;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
@@ -31,6 +32,7 @@ import org.apache.flume.lifecycle.LifecycleState;
import org.slf4j.Logger;
import java.util.List;
+import java.util.Map;
/**
* Cls channel worker.
@@ -136,8 +138,17 @@ public class ClsChannelWorker extends Thread {
return;
}
event.getHeaders().put(ClsSinkContext.KEY_TOPIC_ID,
idConfig.getTopicId());
+ context.addSendMetric(event, idConfig.getTopicId());
AsyncProducerClient client = context.getClient(idConfig.getSecretId());
- List<LogItem> record = handler.parse(context, event);
+ // transform
+ TransformProcessor<String, Map<String, Object>> processor =
+ context.getTransformProcessor(event.getUid());
+ List<LogItem> record = null;
+ if (processor == null) {
+ record = handler.parse(context, event);
+ } else {
+ record = handler.parse(context, event, processor);
+ }
ClsCallback callback = new ClsCallback(tx, context, event);
client.putLogs(idConfig.getTopicId(), record, callback);
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
index cbf402a83f..a10630f5b5 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.standalone.sink.cls;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.ClsSinkConfig;
import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
@@ -43,6 +44,7 @@ import java.util.stream.Collectors;
public class ClsIdConfig extends IdConfig {
private String separator = "|";
+ private DataTypeConfig dataTypeConfig;
private String endpoint;
private String secretId;
private String secretKey;
@@ -63,6 +65,7 @@ public class ClsIdConfig extends IdConfig {
.contentOffset(sinkConfig.getContentOffset())
.fieldOffset(sinkConfig.getFieldOffset())
.separator(sinkConfig.getSeparator())
+
.dataTypeConfig(dataFlowConfig.getSourceConfig().getDataTypeConfig())
.fieldList(fields)
.topicId(sinkConfig.getTopicId())
.endpoint(nodeConfig.getEndpoint())
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index f53cc2fa91..12ce29fbe5 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -19,8 +19,16 @@ package org.apache.inlong.sort.standalone.sink.cls;
import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
import org.apache.inlong.common.pojo.sort.TaskConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.ClsSinkConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.sdk.transform.encode.MapSinkEncoder;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
@@ -36,10 +44,12 @@ import
org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
import com.tencentcloudapi.cls.producer.AsyncProducerClient;
import com.tencentcloudapi.cls.producer.AsyncProducerConfig;
import com.tencentcloudapi.cls.producer.errors.ProducerException;
import com.tencentcloudapi.cls.producer.util.NetworkUtils;
+import lombok.Getter;
import org.apache.commons.lang3.ClassUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -81,6 +91,10 @@ public class ClsSinkContext extends SinkContext {
private ClsNodeConfig clsNodeConfig;
private ObjectMapper objectMapper;
+ // Map<Uid, TransformProcessor<String sourceType, Map<String, Object>
sinkType>
+ @Getter
+ protected Map<String, TransformProcessor<String, Map<String, Object>>>
transformMap;
+
public ClsSinkContext(String sinkName, Context context, Channel channel) {
super(sinkName, context, channel);
this.clientMap = new ConcurrentHashMap<>();
@@ -122,7 +136,14 @@ public class ClsSinkContext extends SinkContext {
Map<String, ClsIdConfig> fromTaskConfig =
reloadIdParamsFromTaskConfig(taskConfig, clsNodeConfig);
Map<String, ClsIdConfig> fromSortTaskConfig =
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
SortConfigMetricReporter.reportClusterDiff(clusterId, taskName,
fromTaskConfig, fromSortTaskConfig);
- idConfigMap = unifiedConfiguration ? fromTaskConfig :
fromSortTaskConfig;
+ Map<String, TransformProcessor<String, Map<String, Object>>>
transformProcessor =
+ reloadTransform(taskConfig);
+ if (unifiedConfiguration) {
+ idConfigMap = fromTaskConfig;
+ transformMap = transformProcessor;
+ } else {
+ idConfigMap = fromSortTaskConfig;
+ }
this.reloadClients(idConfigMap);
this.reloadHandler();
} catch (Exception e) {
@@ -216,6 +237,18 @@ public class ClsSinkContext extends SinkContext {
}
deletingClients.add(clientMap.remove(secretId));
}
+ /**
+ * addSendMetric
+ *
+ * @param currentRecord
+ * @param topic
+ */
+ public void addSendMetric(ProfileEvent currentRecord, String topic) {
+ Map<String, String> dimensions = this.getDimensions(currentRecord,
topic);
+ SortMetricItem metricItem =
this.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.sendCount.incrementAndGet();
+ metricItem.sendSize.addAndGet(currentRecord.getBody().length);
+ }
public void addSendResultMetric(ProfileEvent currentRecord, String bid,
boolean result, long sendTime) {
Map<String, String> dimensions = this.getDimensions(currentRecord,
bid);
@@ -239,6 +272,13 @@ public class ClsSinkContext extends SinkContext {
}
}
+ public void addSendFilterMetric(ProfileEvent currentRecord, String bid) {
+ Map<String, String> dimensions = this.getDimensions(currentRecord,
bid);
+ SortMetricItem metricItem =
this.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.sendFilterCount.incrementAndGet();
+ metricItem.sendFilterSize.addAndGet(currentRecord.getBody().length);
+ }
+
private Map<String, String> getDimensions(ProfileEvent currentRecord,
String bid) {
Map<String, String> dimensions = new HashMap<>();
dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
@@ -268,4 +308,56 @@ public class ClsSinkContext extends SinkContext {
public AsyncProducerClient getClient(String secretId) {
return clientMap.get(secretId);
}
+
+ public TransformProcessor<String, Map<String, Object>>
getTransformProcessor(String uid) {
+ return this.transformMap.get(uid);
+ }
+
+ private Map<String, TransformProcessor<String, Map<String, Object>>>
reloadTransform(TaskConfig taskConfig) {
+ ImmutableMap.Builder<String, TransformProcessor<String, Map<String,
Object>>> builder =
+ new ImmutableMap.Builder<>();
+
+ taskConfig.getClusterTagConfigs()
+ .stream()
+ .map(ClusterTagConfig::getDataFlowConfigs)
+ .flatMap(Collection::stream)
+ .forEach(flow -> {
+ TransformProcessor<String, Map<String, Object>>
transformProcessor =
+ createTransform(flow);
+ if (transformProcessor == null) {
+ return;
+ }
+ builder.put(InlongId.generateUid(flow.getInlongGroupId(),
flow.getInlongStreamId()),
+ transformProcessor);
+ });
+
+ return builder.build();
+ }
+
+ private TransformProcessor<String, Map<String, Object>>
createTransform(DataFlowConfig dataFlowConfig) {
+ try {
+ return TransformProcessor.create(
+ createTransformConfig(dataFlowConfig),
+ createSourceDecoder(dataFlowConfig.getSourceConfig()),
+ createClsSinkEncoder(dataFlowConfig.getSinkConfig()));
+ } catch (Exception e) {
+ LOG.error("failed to reload transform of dataflow={}, ex={}",
dataFlowConfig.getDataflowId(),
+ e.getMessage());
+ return null;
+ }
+ }
+
+ private MapSinkEncoder createClsSinkEncoder(SinkConfig sinkConfig) {
+ if (!(sinkConfig instanceof ClsSinkConfig)) {
+ throw new IllegalArgumentException("sinkInfo must be an instance
of ClsSinkConfig");
+ }
+ ClsSinkConfig clsSinkConfig = (ClsSinkConfig) sinkConfig;
+ List<FieldInfo> fieldInfos = clsSinkConfig.getFieldConfigs()
+ .stream()
+ .map(config -> new FieldInfo(config.getName(),
deriveTypeConverter(config.getFormatInfo())))
+ .collect(Collectors.toList());
+
+ MapSinkInfo sinkInfo = new MapSinkInfo(sinkConfig.getEncodingType(),
fieldInfos);
+ return SinkEncoderFactory.createMapEncoder(sinkInfo);
+ }
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
index 7b9e6e6fae..433e3c24dd 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/DefaultEvent2LogItemHandler.java
@@ -17,6 +17,11 @@
package org.apache.inlong.sort.standalone.sink.cls;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.inlong.sort.formats.util.StringUtils;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.inlong.sort.standalone.utils.UnescapeHelper;
@@ -27,9 +32,11 @@ import org.slf4j.Logger;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Default event to logItem handler.
@@ -58,8 +65,30 @@ public class DefaultEvent2LogItemHandler implements
IEvent2LogItemHandler {
// prepare values
String stringValues = this.getStringValues(event, idConfig);
- char delimiter = idConfig.getSeparator().charAt(0);
- List<String> listValues = UnescapeHelper.toFiledList(stringValues,
delimiter);
+ List<String> listValues = null;
+ if (idConfig.getDataTypeConfig() == null) {
+ char delimiter = idConfig.getSeparator().charAt(0);
+ listValues = UnescapeHelper.toFiledList(stringValues, delimiter);
+ } else {
+ DataTypeConfig dataTypeConfig = idConfig.getDataTypeConfig();
+ if (dataTypeConfig instanceof CsvConfig) {
+ CsvConfig csvConfig = (CsvConfig) dataTypeConfig;
+ String[] csvArray = StringUtils.splitCsv(stringValues,
csvConfig.getDelimiter(),
+ csvConfig.getEscapeChar(), null);
+ listValues = Arrays.asList(csvArray);
+ } else if (dataTypeConfig instanceof KvConfig) {
+ KvConfig kvConfig = (KvConfig) dataTypeConfig;
+ Map<String, String> kvMap = StringUtils.splitKv(stringValues,
kvConfig.getEntrySplitter(),
+ kvConfig.getKvSplitter(), kvConfig.getEscapeChar(),
null);
+ List<String> listKeys = idConfig.getFieldList();
+ final List<String> finalListValues = new
ArrayList<>(listKeys.size());
+ listKeys.forEach(v -> finalListValues.add(kvMap.get(v)));
+ listValues = finalListValues;
+ } else {
+ char delimiter = idConfig.getSeparator().charAt(0);
+ listValues = UnescapeHelper.toFiledList(stringValues,
delimiter);
+ }
+ }
listValues.forEach(value -> this.truncateSingleValue(value,
context.getKeywordMaxLength()));
// prepare keys
List<String> listKeys = idConfig.getFieldList();
@@ -122,4 +151,47 @@ public class DefaultEvent2LogItemHandler implements
IEvent2LogItemHandler {
return "";
}
+ @Override
+ public List<LogItem> parse(
+ ClsSinkContext context,
+ ProfileEvent event,
+ TransformProcessor<String, Map<String, Object>> processor) {
+ String uid = event.getUid();
+ ClsIdConfig idConfig = context.getIdConfig(uid);
+ if (idConfig == null) {
+ LOG.error("There is no cls id config for uid {}, discard it", uid);
+ context.addSendResultMetric(event, context.getTaskName(), false,
System.currentTimeMillis());
+ return null;
+ }
+
+ // prepare values
+ String stringValues = this.getStringValues(event, idConfig);
+ Map<String, Object> extParams = new ConcurrentHashMap<>();
+ event.getHeaders().forEach((k, v) -> extParams.put(k, v));
+ List<Map<String, Object>> resultList =
processor.transform(stringValues, extParams);
+ if (resultList == null) {
+ context.addSendFilterMetric(event, context.getTaskName());
+ return null;
+ }
+ List<LogItem> itemList = new ArrayList<>();
+ for (Map<String, Object> result : resultList) {
+ // prepare keys
+ List<String> listKeys = idConfig.getFieldList();
+ final List<String> listValues = new ArrayList<>(listKeys.size());
+ listKeys.forEach(v ->
listValues.add(String.valueOf(result.get(v))));
+ listValues.forEach(value -> this.truncateSingleValue(value,
context.getKeywordMaxLength()));
+ // prepare offset
+ int fieldOffset = idConfig.getFieldOffset();
+ // convert to LogItem format
+ LogItem item = this.parseToLogItem(listKeys, listValues,
event.getRawLogTime(), fieldOffset);
+ // add ftime
+ String ftime = dateFormat.format(new Date(event.getRawLogTime()));
+ item.PushBack("ftime", ftime);
+ // add extinfo
+ String extinfo = this.getExtInfo(event);
+ item.PushBack("extinfo", extinfo);
+ itemList.add(item);
+ }
+ return itemList;
+ }
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/IEvent2LogItemHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/IEvent2LogItemHandler.java
index e419f61e69..eb96bef5ec 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/IEvent2LogItemHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/IEvent2LogItemHandler.java
@@ -17,11 +17,13 @@
package org.apache.inlong.sort.standalone.sink.cls;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import com.tencentcloudapi.cls.producer.common.LogItem;
import java.util.List;
+import java.util.Map;
/**
* Handler to pares profile event to CLS {@literal List<LogItem>} format.
@@ -36,4 +38,9 @@ public interface IEvent2LogItemHandler {
* @return {@literal List<LogItem>}
*/
List<LogItem> parse(ClsSinkContext context, ProfileEvent event);
+
+ List<LogItem> parse(
+ ClsSinkContext context,
+ ProfileEvent event,
+ TransformProcessor<String, Map<String, Object>> processor);
}