This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 58186709c3 [INLONG-11888][Sort] ​​Sort ElasticSearch supports format 
conversion and data filtering via Key/Value data format (#11889)
58186709c3 is described below

commit 58186709c3c51f0bd041e7baf2dc7137b980c431
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Jun 16 18:57:56 2025 +0800

    [INLONG-11888][Sort] ​​Sort ElasticSearch supports format conversion and 
data filtering via Key/Value data format (#11889)
---
 .../DefaultEvent2IndexRequestHandler.java          | 43 +++++++++++++++++++---
 .../sink/elasticsearch/EsChannelWorker.java        | 14 +++++--
 .../standalone/sink/elasticsearch/EsIdConfig.java  |  3 ++
 .../sink/elasticsearch/EsSinkContext.java          | 21 +++++++++++
 4 files changed, 72 insertions(+), 9 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
index d67b60beca..0d91668e53 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
@@ -17,8 +17,12 @@
 
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
 import org.apache.inlong.sdk.commons.protocol.EventConstants;
 import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.inlong.sort.formats.util.StringUtils;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.utils.UnescapeHelper;
 
@@ -26,6 +30,8 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.nio.charset.Charset;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -61,8 +67,7 @@ public class DefaultEvent2IndexRequestHandler implements 
IEvent2IndexRequestHand
             return null;
         }
         // parse fields
-        String delimeter = idConfig.getSeparator();
-        char cDelimeter = delimeter.charAt(0);
+        String strDelimiter = idConfig.getSeparator();
         String strContext = null;
         // for tab separator
         byte[] bodyBytes = event.getBody();
@@ -73,9 +78,34 @@ public class DefaultEvent2IndexRequestHandler implements 
IEvent2IndexRequestHand
         } else {
             strContext = new String(bodyBytes, Charset.defaultCharset());
         }
+        List<String> columnValues = null;
+        if (idConfig.getDataTypeConfig() == null) {
+            char delimiter = idConfig.getSeparator().charAt(0);
+            columnValues = UnescapeHelper.toFiledList(strContext, delimiter);
+        } else {
+            DataTypeConfig dataTypeConfig = idConfig.getDataTypeConfig();
+            if (dataTypeConfig instanceof CsvConfig) {
+                CsvConfig csvConfig = (CsvConfig) dataTypeConfig;
+                String[] csvArray = StringUtils.splitCsv(strContext, 
csvConfig.getDelimiter(),
+                        csvConfig.getEscapeChar(), null);
+                columnValues = Arrays.asList(csvArray);
+                strDelimiter = csvConfig.getDelimiter().toString();
+            } else if (dataTypeConfig instanceof KvConfig) {
+                KvConfig kvConfig = (KvConfig) dataTypeConfig;
+                Map<String, String> kvMap = StringUtils.splitKv(strContext, 
kvConfig.getEntrySplitter(),
+                        kvConfig.getKvSplitter(), kvConfig.getEscapeChar(), 
null);
+                List<String> listKeys = idConfig.getFieldList();
+                final List<String> finalListValues = new 
ArrayList<>(listKeys.size());
+                listKeys.forEach(v -> finalListValues.add(kvMap.get(v)));
+                columnValues = finalListValues;
+                strDelimiter = kvConfig.getEntrySplitter().toString();
+            } else {
+                char delimiter = idConfig.getSeparator().charAt(0);
+                columnValues = UnescapeHelper.toFiledList(strContext, 
delimiter);
+            }
+        }
         // unescape
-        List<String> columnVlues = UnescapeHelper.toFiledList(strContext, 
cDelimeter);
-        int valueLength = columnVlues.size();
+        int valueLength = columnValues.size();
         List<String> fieldList = idConfig.getFieldList();
         int columnLength = fieldList.size();
         // field offset
@@ -85,7 +115,7 @@ public class DefaultEvent2IndexRequestHandler implements 
IEvent2IndexRequestHand
         for (int i = fieldOffset; i < columnLength; ++i) {
             String fieldName = fieldList.get(i);
             int columnIndex = i - fieldOffset;
-            String fieldValue = columnIndex < valueLength ? 
columnVlues.get(columnIndex) : "";
+            String fieldValue = columnIndex < valueLength ? 
columnValues.get(columnIndex) : "";
             byte[] fieldBytes = fieldValue.getBytes(Charset.defaultCharset());
             if (fieldBytes.length > context.getKeywordMaxLength()) {
                 fieldValue = new String(fieldBytes, 0, 
context.getKeywordMaxLength());
@@ -103,7 +133,8 @@ public class DefaultEvent2IndexRequestHandler implements 
IEvent2IndexRequestHand
         // build
         EsIndexRequest indexRequest = new EsIndexRequest(indexName, event);
         if (context.isUseIndexId()) {
-            String esIndexId = uid + delimeter + event.getRawLogTime() + 
delimeter + esIndexIndex.incrementAndGet();
+            String esIndexId =
+                    uid + strDelimiter + event.getRawLogTime() + strDelimiter 
+ esIndexIndex.incrementAndGet();
             indexRequest.id(esIndexId);
         }
         indexRequest.source(fieldMap);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
index 93a29e0142..877f815f9b 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
@@ -95,6 +95,15 @@ public class EsChannelWorker extends Thread {
             }
             // to profileEvent
             ProfileEvent profileEvent = (ProfileEvent) event;
+            String uid = profileEvent.getUid();
+            EsIdConfig idConfig = context.getIdConfig(uid);
+            if (idConfig == null) {
+                tx.commit();
+                profileEvent.ack();
+                LOG.error("There is no id config for uid {}, discard it", 
profileEvent.getUid());
+                context.addSendResultMetric(profileEvent, 
context.getTaskName(), false, System.currentTimeMillis());
+                return;
+            }
             TransformProcessor<String, Map<String, Object>> processor =
                     context.getTransformProcessor(profileEvent.getUid());
             if (processor == null) {
@@ -103,8 +112,8 @@ public class EsChannelWorker extends Thread {
                 if (indexRequest != null) {
                     context.offerDispatchQueue(indexRequest);
                 } else {
-                    context.addSendFailMetric();
                     profileEvent.ack();
+                    context.addSendResultMetric(profileEvent, 
context.getTaskName(), false, System.currentTimeMillis());
                 }
             } else {
                 List<EsIndexRequest> indexRequestList = handler.parse(
@@ -112,12 +121,11 @@ public class EsChannelWorker extends Thread {
                 if (CollectionUtils.isNotEmpty(indexRequestList)) {
                     indexRequestList.forEach(context::offerDispatchQueue);
                 } else {
-                    context.addSendFailMetric();
                     profileEvent.ack();
+                    context.addSendFilterMetric(profileEvent, uid);
                 }
             }
             tx.commit();
-
         } catch (Throwable t) {
             LOG.error("Process event failed!" + this.getName(), t);
             try {
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
index 3136a49276..06805bf764 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.sink.EsSinkConfig;
 import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
@@ -69,6 +70,7 @@ public class EsIdConfig extends IdConfig {
     };
 
     private String separator = "|";
+    private DataTypeConfig dataTypeConfig;
     private String indexNamePattern;
     private String fieldNames;
     private int fieldOffset = 2; // for ftime,extinfo
@@ -97,6 +99,7 @@ public class EsIdConfig extends IdConfig {
                 .contentOffset(sinkConfig.getContentOffset())
                 .fieldOffset(sinkConfig.getFieldOffset())
                 .separator(sinkConfig.getSeparator())
+                
.dataTypeConfig(dataFlowConfig.getSourceConfig().getDataTypeConfig())
                 .indexNamePattern(sinkConfig.getIndexNamePattern())
                 .fieldList(fields)
                 .charset(charset)
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 9c9664d775..0a07ff3c31 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -419,6 +419,27 @@ public class EsSinkContext extends SinkContext {
         }
     }
 
+    public void addSendFilterMetric(ProfileEvent currentRecord, String bid) {
+        Map<String, String> dimensions = this.getDimensions(currentRecord, 
bid);
+        SortMetricItem metricItem = 
this.getMetricItemSet().findMetricItem(dimensions);
+        metricItem.sendFilterCount.incrementAndGet();
+        metricItem.sendFilterSize.addAndGet(currentRecord.getBody().length);
+    }
+
+    private Map<String, String> getDimensions(ProfileEvent currentRecord, 
String bid) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+        // metric
+        fillInlongId(currentRecord, dimensions);
+        dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+        dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, bid);
+        long msgTime = currentRecord.getRawLogTime();
+        long auditFormatTime = msgTime - msgTime % 
CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, 
String.valueOf(auditFormatTime));
+        return dimensions;
+    }
+
     /**
      * getIdConfig
      *

Reply via email to