This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bbd1177b72 [INLONG-11902][Sort] Allow SortCls to filter out data in
TransformFunction (#11903)
bbd1177b72 is described below
commit bbd1177b72f57d3da31d21e1f87dd98590158c0c
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Jun 23 18:32:47 2025 +0800
[INLONG-11902][Sort] Allow SortCls to filter out data in TransformFunction
(#11903)
* [INLONG-11902][Sort] Allow SortCls to filter out data in TransformFunction
* fix spotless problem
---
.../inlong/sort/standalone/sink/cls/ClsChannelWorker.java | 11 +++++++++--
.../standalone/sink/elasticsearch/EsCallbackListener.java | 3 +++
2 files changed, 12 insertions(+), 2 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
index 5b76dd5d2d..fe84c453be 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
@@ -149,8 +149,15 @@ public class ClsChannelWorker extends Thread {
} else {
record = handler.parse(context, event, processor);
}
- ClsCallback callback = new ClsCallback(tx, context, event);
- client.putLogs(idConfig.getTopicId(), record, callback);
+ if (record != null) {
+ ClsCallback callback = new ClsCallback(tx, context, event);
+ client.putLogs(idConfig.getTopicId(), record, callback);
+ } else {
+ context.addSendFilterMetric(event, idConfig.getTopicId());
+ event.ack();
+ tx.commit();
+ tx.close();
+ }
}
/** sleepOneInterval */
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
index 7acc483351..3265286834 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
@@ -21,6 +21,7 @@ import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import com.google.gson.Gson;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
@@ -90,6 +91,8 @@ public class EsCallbackListener implements
BulkProcessor.Listener {
if (event.getSendedTime() <=
CommonPropertiesHolder.getMaxSendFailTimes()) {
context.backDispatchQueue(requestItem);
} else {
+
LOG.error("afterBulk,executionId,executionId:{},request:{},Failure:{}",
+ executionId, request, new
Gson().toJson(responseItem.getFailure()));
event.negativeAck();
}
} else {