This is an automated email from the ASF dual-hosted git repository.

zhanglistar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a5516b755 [GLUTEN-12339][FLINK] Bind native callback target for 
Gluten operators (#12274)
5a5516b755 is described below

commit 5a5516b7550c00d88747ba0146a3892464a6730c
Author: lgbo <[email protected]>
AuthorDate: Fri Jun 26 11:43:12 2026 +0800

    [GLUTEN-12339][FLINK] Bind native callback target for Gluten operators 
(#12274)
    
    * [FLINK] Bind native callback target for Gluten operators
    
    * [FLINK] Rename native processing time callback
    
    * [FLINK] Remove legacy JNI operator callback registry
    
    * [FLINK] Fix spotless formatting
    
    * [FLINK] Unbind native callback target on close
    
    * [FLINK] Guard native callback drain during close
---
 .../streaming/api/operators/GlutenOperator.java    | 14 -----
 .../table/runtime/operators/GlutenCloseables.java  | 44 ++++++++++++++
 .../runtime/operators/GlutenOneInputOperator.java  | 65 +++++++++++++++++----
 .../runtime/operators/GlutenSessionResources.java  | 15 -----
 .../runtime/operators/GlutenTwoInputOperator.java  | 67 +++++++++++++++++-----
 5 files changed, 150 insertions(+), 55 deletions(-)

diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenOperator.java
index bb6bc3bf54..666fd47024 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenOperator.java
@@ -17,7 +17,6 @@
 package org.apache.gluten.streaming.api.operators;
 
 import org.apache.gluten.table.runtime.operators.GlutenMailboxHolder;
-import org.apache.gluten.table.runtime.operators.GlutenSessionResources;
 
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
 import io.github.zhztheplayer.velox4j.type.RowType;
@@ -64,19 +63,6 @@ public interface GlutenOperator {
     mailboxHolder().get().scheduleDrain(drainAction);
   }
 
-  /**
-   * Called from native Velox code to drain operator output. Drain is always 
scheduled on the Flink
-   * task mailbox thread.
-   */
-  static void processElementByJni(String operatorId) {
-    GlutenOperator operator =
-        
GlutenSessionResources.getInstance().getOperator(operatorId).orElse(null);
-    if (operator == null) {
-      throw new IllegalArgumentException("Operator not found: " + operatorId);
-    }
-    operator.scheduleProcessElementOnMailbox();
-  }
-
   /** Schedules native output drain on the mailbox thread. Implemented by 
concrete operators. */
   default void scheduleProcessElementOnMailbox() {}
 }
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCloseables.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCloseables.java
new file mode 100644
index 0000000000..782ec6163f
--- /dev/null
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCloseables.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.table.runtime.operators;
+
+final class GlutenCloseables {
+
+  private GlutenCloseables() {}
+
+  static void runWithCleanup(CloseAction... actions) throws Exception {
+    Exception failure = null;
+    for (CloseAction action : actions) {
+      try {
+        action.close();
+      } catch (Exception e) {
+        if (failure == null) {
+          failure = e;
+        } else {
+          failure.addSuppressed(e);
+        }
+      }
+    }
+    if (failure != null) {
+      throw failure;
+    }
+  }
+
+  interface CloseAction {
+    void close() throws Exception;
+  }
+}
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
index 09b059f3bc..05645fda9a 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
@@ -31,6 +31,7 @@ import io.github.zhztheplayer.velox4j.plan.TableScanNode;
 import io.github.zhztheplayer.velox4j.query.Query;
 import io.github.zhztheplayer.velox4j.query.SerialTask;
 import io.github.zhztheplayer.velox4j.serde.Serde;
+import io.github.zhztheplayer.velox4j.stateful.NativeCallbackTarget;
 import io.github.zhztheplayer.velox4j.stateful.StatefulElement;
 import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
 import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark;
@@ -49,7 +50,7 @@ import java.util.Map;
 
 /** Calculate operator in gluten, which will call Velox to run. */
 public class GlutenOneInputOperator<IN, OUT> extends TableStreamOperator<OUT>
-    implements OneInputStreamOperator<IN, OUT>, GlutenOperator {
+    implements OneInputStreamOperator<IN, OUT>, GlutenOperator, 
NativeCallbackTarget {
 
   private final StatefulPlanNode glutenPlan;
   private final String id;
@@ -62,6 +63,7 @@ public class GlutenOneInputOperator<IN, OUT> extends 
TableStreamOperator<OUT>
   private transient Query query;
   private transient ExternalStreams.BlockingQueue inputQueue;
   protected transient SerialTask task;
+  private transient volatile boolean closing;
   private final Class<IN> inClass;
   private final Class<OUT> outClass;
   private transient VectorInputBridge<IN> inputBridge;
@@ -119,7 +121,6 @@ public class GlutenOneInputOperator<IN, OUT> extends 
TableStreamOperator<OUT>
     }
     sessionResource = new GlutenSessionResource();
     GlutenSessionResources.getInstance().addSessionResource(id, 
sessionResource);
-    
GlutenSessionResources.getInstance().addOperator(this.getClass().getSimpleName(),
 this);
     inputQueue = 
sessionResource.getSession().externalStreamOps().newBlockingQueue();
     // add a mock input as velox not allow the source is empty.
     if (inputType == null) {
@@ -143,6 +144,7 @@ public class GlutenOneInputOperator<IN, OUT> extends 
TableStreamOperator<OUT>
             VeloxQueryConfig.getConfig(getRuntimeContext()),
             VeloxConnectorConfig.getConfig(getRuntimeContext()));
     task = sessionResource.getSession().queryOps().execute(query);
+    task.bindNativeCallbackTarget(this);
     task.addSplit(
         id, new ExternalStreamConnectorSplit("connector-external-stream", 
inputQueue.id()));
     task.noMoreSplits(id);
@@ -150,6 +152,7 @@ public class GlutenOneInputOperator<IN, OUT> extends 
TableStreamOperator<OUT>
 
   @Override
   public void open() throws Exception {
+    closing = false;
     super.open();
     if (!mailboxHolder().get().isMailboxBound()) {
       ensureMailboxInitialized(getContainingTask());
@@ -183,16 +186,36 @@ public class GlutenOneInputOperator<IN, OUT> extends 
TableStreamOperator<OUT>
 
   @Override
   public void scheduleProcessElementOnMailbox() {
+    if (closing) {
+      return;
+    }
     scheduleDrainOnMailbox(this::drainTaskOutput);
   }
 
+  @Override
+  public void onProcessingTime(long timestamp) {
+    if (closing) {
+      return;
+    }
+    scheduleProcessElementOnMailbox();
+  }
+
   @Override
   public void processElementInternal() {
+    if (closing) {
+      return;
+    }
     drainOutput(this::drainTaskOutput);
   }
 
   private void drainTaskOutput() {
+    if (closing) {
+      return;
+    }
     while (true) {
+      if (closing) {
+        return;
+      }
       UpIterator.State state = task.advance();
       if (state == UpIterator.State.AVAILABLE) {
         final StatefulElement statefulElement = task.statefulGet();
@@ -238,16 +261,34 @@ public class GlutenOneInputOperator<IN, OUT> extends 
TableStreamOperator<OUT>
 
   @Override
   public void close() throws Exception {
-    if (task != null) {
-      task.close();
-    }
-    if (inputQueue != null) {
-      inputQueue.noMoreInput();
-      inputQueue.close();
-    }
-    if (sessionResource != null) {
-      sessionResource.close();
-    }
+    closing = true;
+    GlutenCloseables.runWithCleanup(
+        () -> {
+          if (task != null) {
+            task.unbindNativeCallbackTarget();
+          }
+        },
+        () -> {
+          if (task != null) {
+            task.close();
+          }
+        },
+        () -> {
+          if (inputQueue != null) {
+            inputQueue.noMoreInput();
+          }
+        },
+        () -> {
+          if (inputQueue != null) {
+            inputQueue.close();
+          }
+        },
+        () -> {
+          if (sessionResource != null) {
+            sessionResource.close();
+          }
+        },
+        super::close);
   }
 
   @Override
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResources.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResources.java
index fac0784a24..cda410f719 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResources.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResources.java
@@ -16,8 +16,6 @@
  */
 package org.apache.gluten.table.runtime.operators;
 
-import org.apache.gluten.streaming.api.operators.GlutenOperator;
-
 import io.github.zhztheplayer.velox4j.Velox4j;
 import io.github.zhztheplayer.velox4j.memory.AllocationListener;
 import io.github.zhztheplayer.velox4j.memory.MemoryManager;
@@ -30,7 +28,6 @@ import org.apache.arrow.memory.RootAllocator;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
 // Manage the session and resource for Velox.
 class GlutenSessionResource {
@@ -84,7 +81,6 @@ class GlutenSessionResource {
 public class GlutenSessionResources {
   private static final GlutenSessionResources instance = new 
GlutenSessionResources();
   private Map<String, GlutenSessionResource> sessionResources = new 
HashMap<>();
-  private Map<String, GlutenOperator> operators = new HashMap<>();
 
   private GlutenSessionResources() {}
 
@@ -103,15 +99,4 @@ public class GlutenSessionResources {
   public Session getSession(String id) {
     return sessionResources.get(id).getSession();
   }
-
-  public void addOperator(String id, GlutenOperator operator) {
-    operators.put(id, operator);
-  }
-
-  public Optional<GlutenOperator> getOperator(String id) {
-    if (operators.containsKey(id)) {
-      return Optional.of(operators.get(id));
-    }
-    return Optional.empty();
-  }
 }
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
index 3f73ad4bbd..1251bb084e 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
@@ -28,6 +28,7 @@ import io.github.zhztheplayer.velox4j.iterator.UpIterator;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
 import io.github.zhztheplayer.velox4j.query.Query;
 import io.github.zhztheplayer.velox4j.query.SerialTask;
+import io.github.zhztheplayer.velox4j.stateful.NativeCallbackTarget;
 import io.github.zhztheplayer.velox4j.stateful.StatefulElement;
 import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
 import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark;
@@ -50,7 +51,7 @@ import java.util.Map;
  * instead of flink RowData.
  */
 public class GlutenTwoInputOperator<IN, OUT> extends 
AbstractStreamOperator<OUT>
-    implements TwoInputStreamOperator<IN, IN, OUT>, GlutenOperator {
+    implements TwoInputStreamOperator<IN, IN, OUT>, GlutenOperator, 
NativeCallbackTarget {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(GlutenTwoInputOperator.class);
 
@@ -67,6 +68,7 @@ public class GlutenTwoInputOperator<IN, OUT> extends 
AbstractStreamOperator<OUT>
   private ExternalStreams.BlockingQueue leftInputQueue;
   private ExternalStreams.BlockingQueue rightInputQueue;
   private SerialTask task;
+  private transient volatile boolean closing;
   private final Class<IN> inClass;
   private final Class<OUT> outClass;
   private VectorInputBridge<IN> inputBridge;
@@ -117,6 +119,7 @@ public class GlutenTwoInputOperator<IN, OUT> extends 
AbstractStreamOperator<OUT>
 
   @Override
   public void open() throws Exception {
+    closing = false;
     super.open();
     if (!mailboxHolder().get().isMailboxBound()) {
       ensureMailboxInitialized(getContainingTask());
@@ -136,9 +139,20 @@ public class GlutenTwoInputOperator<IN, OUT> extends 
AbstractStreamOperator<OUT>
 
   @Override
   public void scheduleProcessElementOnMailbox() {
+    if (closing) {
+      return;
+    }
     scheduleDrainOnMailbox(this::drainTaskOutput);
   }
 
+  @Override
+  public void onProcessingTime(long timestamp) {
+    if (closing) {
+      return;
+    }
+    scheduleProcessElementOnMailbox();
+  }
+
   @Override
   public void processElement1(StreamRecord<IN> element) {
     StatefulRecord statefulRecord =
@@ -168,11 +182,20 @@ public class GlutenTwoInputOperator<IN, OUT> extends 
AbstractStreamOperator<OUT>
 
   @Override
   public void processElementInternal() {
+    if (closing) {
+      return;
+    }
     drainOutput(this::drainTaskOutput);
   }
 
   private void drainTaskOutput() {
+    if (closing) {
+      return;
+    }
     while (true) {
+      if (closing) {
+        return;
+      }
       UpIterator.State state = task.advance();
       if (state == UpIterator.State.AVAILABLE) {
         final StatefulElement element = task.statefulGet();
@@ -213,18 +236,34 @@ public class GlutenTwoInputOperator<IN, OUT> extends 
AbstractStreamOperator<OUT>
 
   @Override
   public void close() throws Exception {
-    if (leftInputQueue != null) {
-      leftInputQueue.close();
-    }
-    if (rightInputQueue != null) {
-      rightInputQueue.close();
-    }
-    if (task != null) {
-      task.close();
-    }
-    if (sessionResource != null) {
-      sessionResource.close();
-    }
+    closing = true;
+    GlutenCloseables.runWithCleanup(
+        () -> {
+          if (leftInputQueue != null) {
+            leftInputQueue.close();
+          }
+        },
+        () -> {
+          if (rightInputQueue != null) {
+            rightInputQueue.close();
+          }
+        },
+        () -> {
+          if (task != null) {
+            task.unbindNativeCallbackTarget();
+          }
+        },
+        () -> {
+          if (task != null) {
+            task.close();
+          }
+        },
+        () -> {
+          if (sessionResource != null) {
+            sessionResource.close();
+          }
+        },
+        super::close);
   }
 
   @Override
@@ -286,7 +325,6 @@ public class GlutenTwoInputOperator<IN, OUT> extends 
AbstractStreamOperator<OUT>
 
     sessionResource = new GlutenSessionResource();
     GlutenSessionResources.getInstance().addSessionResource(getId(), 
sessionResource);
-    GlutenSessionResources.getInstance().addOperator(getId(), this);
     leftInputQueue = 
sessionResource.getSession().externalStreamOps().newBlockingQueue();
     rightInputQueue = 
sessionResource.getSession().externalStreamOps().newBlockingQueue();
 
@@ -296,6 +334,7 @@ public class GlutenTwoInputOperator<IN, OUT> extends 
AbstractStreamOperator<OUT>
             VeloxQueryConfig.getConfig(getRuntimeContext()),
             VeloxConnectorConfig.getConfig(getRuntimeContext()));
     task = sessionResource.getSession().queryOps().execute(query);
+    task.bindNativeCallbackTarget(this);
 
     ExternalStreamConnectorSplit leftSplit =
         new ExternalStreamConnectorSplit("connector-external-stream", 
leftInputQueue.id());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to