This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bbd1177b72 [INLONG-11902][Sort] Allow SortCls to filter out data in 
TransformFunction (#11903)
bbd1177b72 is described below

commit bbd1177b72f57d3da31d21e1f87dd98590158c0c
Author: ChunLiang Lu <[email protected]>
AuthorDate: Mon Jun 23 18:32:47 2025 +0800

    [INLONG-11902][Sort] Allow SortCls to filter out data in TransformFunction 
(#11903)
    
    * [INLONG-11902][Sort] Allow SortCls to filter out data in TransformFunction
    
    * fix spotless problem
---
 .../inlong/sort/standalone/sink/cls/ClsChannelWorker.java     | 11 +++++++++--
 .../standalone/sink/elasticsearch/EsCallbackListener.java     |  3 +++
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
index 5b76dd5d2d..fe84c453be 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsChannelWorker.java
@@ -149,8 +149,15 @@ public class ClsChannelWorker extends Thread {
         } else {
             record = handler.parse(context, event, processor);
         }
-        ClsCallback callback = new ClsCallback(tx, context, event);
-        client.putLogs(idConfig.getTopicId(), record, callback);
+        if (record != null) {
+            ClsCallback callback = new ClsCallback(tx, context, event);
+            client.putLogs(idConfig.getTopicId(), record, callback);
+        } else {
+            context.addSendFilterMetric(event, idConfig.getTopicId());
+            event.ack();
+            tx.commit();
+            tx.close();
+        }
     }
 
     /** sleepOneInterval */
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
index 7acc483351..3265286834 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
@@ -21,6 +21,7 @@ import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
+import com.google.gson.Gson;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
@@ -90,6 +91,8 @@ public class EsCallbackListener implements 
BulkProcessor.Listener {
                 if (event.getSendedTime() <= 
CommonPropertiesHolder.getMaxSendFailTimes()) {
                     context.backDispatchQueue(requestItem);
                 } else {
+                    
LOG.error("afterBulk,executionId,executionId:{},request:{},Failure:{}",
+                            executionId, request, new 
Gson().toJson(responseItem.getFailure()));
                     event.negativeAck();
                 }
             } else {

Reply via email to