This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 58186709c3 [INLONG-11888][Sort] Sort ElasticSearch supports format
conversion and data filtering via Key/Value data format (#11889)
58186709c3 is described below
commit 58186709c3c51f0bd041e7baf2dc7137b980c431
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Jun 16 18:57:56 2025 +0800
[INLONG-11888][Sort] Sort ElasticSearch supports format conversion and
data filtering via Key/Value data format (#11889)
---
.../DefaultEvent2IndexRequestHandler.java | 43 +++++++++++++++++++---
.../sink/elasticsearch/EsChannelWorker.java | 14 +++++--
.../standalone/sink/elasticsearch/EsIdConfig.java | 3 ++
.../sink/elasticsearch/EsSinkContext.java | 21 +++++++++++
4 files changed, 72 insertions(+), 9 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
index d67b60beca..0d91668e53 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/DefaultEvent2IndexRequestHandler.java
@@ -17,8 +17,12 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
import org.apache.inlong.sdk.commons.protocol.EventConstants;
import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.inlong.sort.formats.util.StringUtils;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.utils.UnescapeHelper;
@@ -26,6 +30,8 @@ import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -61,8 +67,7 @@ public class DefaultEvent2IndexRequestHandler implements
IEvent2IndexRequestHand
return null;
}
// parse fields
- String delimeter = idConfig.getSeparator();
- char cDelimeter = delimeter.charAt(0);
+ String strDelimiter = idConfig.getSeparator();
String strContext = null;
// for tab separator
byte[] bodyBytes = event.getBody();
@@ -73,9 +78,34 @@ public class DefaultEvent2IndexRequestHandler implements
IEvent2IndexRequestHand
} else {
strContext = new String(bodyBytes, Charset.defaultCharset());
}
+ List<String> columnValues = null;
+ if (idConfig.getDataTypeConfig() == null) {
+ char delimiter = idConfig.getSeparator().charAt(0);
+ columnValues = UnescapeHelper.toFiledList(strContext, delimiter);
+ } else {
+ DataTypeConfig dataTypeConfig = idConfig.getDataTypeConfig();
+ if (dataTypeConfig instanceof CsvConfig) {
+ CsvConfig csvConfig = (CsvConfig) dataTypeConfig;
+ String[] csvArray = StringUtils.splitCsv(strContext,
csvConfig.getDelimiter(),
+ csvConfig.getEscapeChar(), null);
+ columnValues = Arrays.asList(csvArray);
+ strDelimiter = csvConfig.getDelimiter().toString();
+ } else if (dataTypeConfig instanceof KvConfig) {
+ KvConfig kvConfig = (KvConfig) dataTypeConfig;
+ Map<String, String> kvMap = StringUtils.splitKv(strContext,
kvConfig.getEntrySplitter(),
+ kvConfig.getKvSplitter(), kvConfig.getEscapeChar(),
null);
+ List<String> listKeys = idConfig.getFieldList();
+ final List<String> finalListValues = new
ArrayList<>(listKeys.size());
+ listKeys.forEach(v -> finalListValues.add(kvMap.get(v)));
+ columnValues = finalListValues;
+ strDelimiter = kvConfig.getEntrySplitter().toString();
+ } else {
+ char delimiter = idConfig.getSeparator().charAt(0);
+ columnValues = UnescapeHelper.toFiledList(strContext,
delimiter);
+ }
+ }
// unescape
- List<String> columnVlues = UnescapeHelper.toFiledList(strContext,
cDelimeter);
- int valueLength = columnVlues.size();
+ int valueLength = columnValues.size();
List<String> fieldList = idConfig.getFieldList();
int columnLength = fieldList.size();
// field offset
@@ -85,7 +115,7 @@ public class DefaultEvent2IndexRequestHandler implements
IEvent2IndexRequestHand
for (int i = fieldOffset; i < columnLength; ++i) {
String fieldName = fieldList.get(i);
int columnIndex = i - fieldOffset;
- String fieldValue = columnIndex < valueLength ?
columnVlues.get(columnIndex) : "";
+ String fieldValue = columnIndex < valueLength ?
columnValues.get(columnIndex) : "";
byte[] fieldBytes = fieldValue.getBytes(Charset.defaultCharset());
if (fieldBytes.length > context.getKeywordMaxLength()) {
fieldValue = new String(fieldBytes, 0,
context.getKeywordMaxLength());
@@ -103,7 +133,8 @@ public class DefaultEvent2IndexRequestHandler implements
IEvent2IndexRequestHand
// build
EsIndexRequest indexRequest = new EsIndexRequest(indexName, event);
if (context.isUseIndexId()) {
- String esIndexId = uid + delimeter + event.getRawLogTime() +
delimeter + esIndexIndex.incrementAndGet();
+ String esIndexId =
+ uid + strDelimiter + event.getRawLogTime() + strDelimiter
+ esIndexIndex.incrementAndGet();
indexRequest.id(esIndexId);
}
indexRequest.source(fieldMap);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
index 93a29e0142..877f815f9b 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
@@ -95,6 +95,15 @@ public class EsChannelWorker extends Thread {
}
// to profileEvent
ProfileEvent profileEvent = (ProfileEvent) event;
+ String uid = profileEvent.getUid();
+ EsIdConfig idConfig = context.getIdConfig(uid);
+ if (idConfig == null) {
+ tx.commit();
+ profileEvent.ack();
+ LOG.error("There is no id config for uid {}, discard it",
profileEvent.getUid());
+ context.addSendResultMetric(profileEvent,
context.getTaskName(), false, System.currentTimeMillis());
+ return;
+ }
TransformProcessor<String, Map<String, Object>> processor =
context.getTransformProcessor(profileEvent.getUid());
if (processor == null) {
@@ -103,8 +112,8 @@ public class EsChannelWorker extends Thread {
if (indexRequest != null) {
context.offerDispatchQueue(indexRequest);
} else {
- context.addSendFailMetric();
profileEvent.ack();
+ context.addSendResultMetric(profileEvent,
context.getTaskName(), false, System.currentTimeMillis());
}
} else {
List<EsIndexRequest> indexRequestList = handler.parse(
@@ -112,12 +121,11 @@ public class EsChannelWorker extends Thread {
if (CollectionUtils.isNotEmpty(indexRequestList)) {
indexRequestList.forEach(context::offerDispatchQueue);
} else {
- context.addSendFailMetric();
profileEvent.ack();
+ context.addSendFilterMetric(profileEvent, uid);
}
}
tx.commit();
-
} catch (Throwable t) {
LOG.error("Process event failed!" + this.getName(), t);
try {
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
index 3136a49276..06805bf764 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
import org.apache.inlong.common.pojo.sort.dataflow.sink.EsSinkConfig;
import org.apache.inlong.sort.standalone.config.pojo.IdConfig;
@@ -69,6 +70,7 @@ public class EsIdConfig extends IdConfig {
};
private String separator = "|";
+ private DataTypeConfig dataTypeConfig;
private String indexNamePattern;
private String fieldNames;
private int fieldOffset = 2; // for ftime,extinfo
@@ -97,6 +99,7 @@ public class EsIdConfig extends IdConfig {
.contentOffset(sinkConfig.getContentOffset())
.fieldOffset(sinkConfig.getFieldOffset())
.separator(sinkConfig.getSeparator())
+
.dataTypeConfig(dataFlowConfig.getSourceConfig().getDataTypeConfig())
.indexNamePattern(sinkConfig.getIndexNamePattern())
.fieldList(fields)
.charset(charset)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 9c9664d775..0a07ff3c31 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -419,6 +419,27 @@ public class EsSinkContext extends SinkContext {
}
}
+ public void addSendFilterMetric(ProfileEvent currentRecord, String bid) {
+ Map<String, String> dimensions = this.getDimensions(currentRecord,
bid);
+ SortMetricItem metricItem =
this.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.sendFilterCount.incrementAndGet();
+ metricItem.sendFilterSize.addAndGet(currentRecord.getBody().length);
+ }
+
+ private Map<String, String> getDimensions(ProfileEvent currentRecord,
String bid) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(SortMetricItem.KEY_TASK_NAME, this.getTaskName());
+ // metric
+ fillInlongId(currentRecord, dimensions);
+ dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName());
+ dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, bid);
+ long msgTime = currentRecord.getRawLogTime();
+ long auditFormatTime = msgTime - msgTime %
CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(SortMetricItem.KEY_MESSAGE_TIME,
String.valueOf(auditFormatTime));
+ return dimensions;
+ }
+
/**
* getIdConfig
*