This is an automated email from the ASF dual-hosted git repository.
zhanglistar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5a5516b755 [GLUTEN-12339][FLINK] Bind native callback target for
Gluten operators (#12274)
5a5516b755 is described below
commit 5a5516b7550c00d88747ba0146a3892464a6730c
Author: lgbo <[email protected]>
AuthorDate: Fri Jun 26 11:43:12 2026 +0800
[GLUTEN-12339][FLINK] Bind native callback target for Gluten operators
(#12274)
* [FLINK] Bind native callback target for Gluten operators
* [FLINK] Rename native processing time callback
* [FLINK] Remove legacy JNI operator callback registry
* [FLINK] Fix spotless formatting
* [FLINK] Unbind native callback target on close
* [FLINK] Guard native callback drain during close
---
.../streaming/api/operators/GlutenOperator.java | 14 -----
.../table/runtime/operators/GlutenCloseables.java | 44 ++++++++++++++
.../runtime/operators/GlutenOneInputOperator.java | 65 +++++++++++++++++----
.../runtime/operators/GlutenSessionResources.java | 15 -----
.../runtime/operators/GlutenTwoInputOperator.java | 67 +++++++++++++++++-----
5 files changed, 150 insertions(+), 55 deletions(-)
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenOperator.java
index bb6bc3bf54..666fd47024 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/api/operators/GlutenOperator.java
@@ -17,7 +17,6 @@
package org.apache.gluten.streaming.api.operators;
import org.apache.gluten.table.runtime.operators.GlutenMailboxHolder;
-import org.apache.gluten.table.runtime.operators.GlutenSessionResources;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
import io.github.zhztheplayer.velox4j.type.RowType;
@@ -64,19 +63,6 @@ public interface GlutenOperator {
mailboxHolder().get().scheduleDrain(drainAction);
}
- /**
- * Called from native Velox code to drain operator output. Drain is always
scheduled on the Flink
- * task mailbox thread.
- */
- static void processElementByJni(String operatorId) {
- GlutenOperator operator =
-
GlutenSessionResources.getInstance().getOperator(operatorId).orElse(null);
- if (operator == null) {
- throw new IllegalArgumentException("Operator not found: " + operatorId);
- }
- operator.scheduleProcessElementOnMailbox();
- }
-
/** Schedules native output drain on the mailbox thread. Implemented by
concrete operators. */
default void scheduleProcessElementOnMailbox() {}
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCloseables.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCloseables.java
new file mode 100644
index 0000000000..782ec6163f
--- /dev/null
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCloseables.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.table.runtime.operators;
+
+final class GlutenCloseables {
+
+ private GlutenCloseables() {}
+
+ static void runWithCleanup(CloseAction... actions) throws Exception {
+ Exception failure = null;
+ for (CloseAction action : actions) {
+ try {
+ action.close();
+ } catch (Exception e) {
+ if (failure == null) {
+ failure = e;
+ } else {
+ failure.addSuppressed(e);
+ }
+ }
+ }
+ if (failure != null) {
+ throw failure;
+ }
+ }
+
+ interface CloseAction {
+ void close() throws Exception;
+ }
+}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
index 09b059f3bc..05645fda9a 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
@@ -31,6 +31,7 @@ import io.github.zhztheplayer.velox4j.plan.TableScanNode;
import io.github.zhztheplayer.velox4j.query.Query;
import io.github.zhztheplayer.velox4j.query.SerialTask;
import io.github.zhztheplayer.velox4j.serde.Serde;
+import io.github.zhztheplayer.velox4j.stateful.NativeCallbackTarget;
import io.github.zhztheplayer.velox4j.stateful.StatefulElement;
import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark;
@@ -49,7 +50,7 @@ import java.util.Map;
/** Calculate operator in gluten, which will call Velox to run. */
public class GlutenOneInputOperator<IN, OUT> extends TableStreamOperator<OUT>
- implements OneInputStreamOperator<IN, OUT>, GlutenOperator {
+ implements OneInputStreamOperator<IN, OUT>, GlutenOperator,
NativeCallbackTarget {
private final StatefulPlanNode glutenPlan;
private final String id;
@@ -62,6 +63,7 @@ public class GlutenOneInputOperator<IN, OUT> extends
TableStreamOperator<OUT>
private transient Query query;
private transient ExternalStreams.BlockingQueue inputQueue;
protected transient SerialTask task;
+ private transient volatile boolean closing;
private final Class<IN> inClass;
private final Class<OUT> outClass;
private transient VectorInputBridge<IN> inputBridge;
@@ -119,7 +121,6 @@ public class GlutenOneInputOperator<IN, OUT> extends
TableStreamOperator<OUT>
}
sessionResource = new GlutenSessionResource();
GlutenSessionResources.getInstance().addSessionResource(id,
sessionResource);
-
GlutenSessionResources.getInstance().addOperator(this.getClass().getSimpleName(),
this);
inputQueue =
sessionResource.getSession().externalStreamOps().newBlockingQueue();
// add a mock input as velox not allow the source is empty.
if (inputType == null) {
@@ -143,6 +144,7 @@ public class GlutenOneInputOperator<IN, OUT> extends
TableStreamOperator<OUT>
VeloxQueryConfig.getConfig(getRuntimeContext()),
VeloxConnectorConfig.getConfig(getRuntimeContext()));
task = sessionResource.getSession().queryOps().execute(query);
+ task.bindNativeCallbackTarget(this);
task.addSplit(
id, new ExternalStreamConnectorSplit("connector-external-stream",
inputQueue.id()));
task.noMoreSplits(id);
@@ -150,6 +152,7 @@ public class GlutenOneInputOperator<IN, OUT> extends
TableStreamOperator<OUT>
@Override
public void open() throws Exception {
+ closing = false;
super.open();
if (!mailboxHolder().get().isMailboxBound()) {
ensureMailboxInitialized(getContainingTask());
@@ -183,16 +186,36 @@ public class GlutenOneInputOperator<IN, OUT> extends
TableStreamOperator<OUT>
@Override
public void scheduleProcessElementOnMailbox() {
+ if (closing) {
+ return;
+ }
scheduleDrainOnMailbox(this::drainTaskOutput);
}
+ @Override
+ public void onProcessingTime(long timestamp) {
+ if (closing) {
+ return;
+ }
+ scheduleProcessElementOnMailbox();
+ }
+
@Override
public void processElementInternal() {
+ if (closing) {
+ return;
+ }
drainOutput(this::drainTaskOutput);
}
private void drainTaskOutput() {
+ if (closing) {
+ return;
+ }
while (true) {
+ if (closing) {
+ return;
+ }
UpIterator.State state = task.advance();
if (state == UpIterator.State.AVAILABLE) {
final StatefulElement statefulElement = task.statefulGet();
@@ -238,16 +261,34 @@ public class GlutenOneInputOperator<IN, OUT> extends
TableStreamOperator<OUT>
@Override
public void close() throws Exception {
- if (task != null) {
- task.close();
- }
- if (inputQueue != null) {
- inputQueue.noMoreInput();
- inputQueue.close();
- }
- if (sessionResource != null) {
- sessionResource.close();
- }
+ closing = true;
+ GlutenCloseables.runWithCleanup(
+ () -> {
+ if (task != null) {
+ task.unbindNativeCallbackTarget();
+ }
+ },
+ () -> {
+ if (task != null) {
+ task.close();
+ }
+ },
+ () -> {
+ if (inputQueue != null) {
+ inputQueue.noMoreInput();
+ }
+ },
+ () -> {
+ if (inputQueue != null) {
+ inputQueue.close();
+ }
+ },
+ () -> {
+ if (sessionResource != null) {
+ sessionResource.close();
+ }
+ },
+ super::close);
}
@Override
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResources.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResources.java
index fac0784a24..cda410f719 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResources.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResources.java
@@ -16,8 +16,6 @@
*/
package org.apache.gluten.table.runtime.operators;
-import org.apache.gluten.streaming.api.operators.GlutenOperator;
-
import io.github.zhztheplayer.velox4j.Velox4j;
import io.github.zhztheplayer.velox4j.memory.AllocationListener;
import io.github.zhztheplayer.velox4j.memory.MemoryManager;
@@ -30,7 +28,6 @@ import org.apache.arrow.memory.RootAllocator;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
// Manage the session and resource for Velox.
class GlutenSessionResource {
@@ -84,7 +81,6 @@ class GlutenSessionResource {
public class GlutenSessionResources {
private static final GlutenSessionResources instance = new
GlutenSessionResources();
private Map<String, GlutenSessionResource> sessionResources = new
HashMap<>();
- private Map<String, GlutenOperator> operators = new HashMap<>();
private GlutenSessionResources() {}
@@ -103,15 +99,4 @@ public class GlutenSessionResources {
public Session getSession(String id) {
return sessionResources.get(id).getSession();
}
-
- public void addOperator(String id, GlutenOperator operator) {
- operators.put(id, operator);
- }
-
- public Optional<GlutenOperator> getOperator(String id) {
- if (operators.containsKey(id)) {
- return Optional.of(operators.get(id));
- }
- return Optional.empty();
- }
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
index 3f73ad4bbd..1251bb084e 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
@@ -28,6 +28,7 @@ import io.github.zhztheplayer.velox4j.iterator.UpIterator;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
import io.github.zhztheplayer.velox4j.query.Query;
import io.github.zhztheplayer.velox4j.query.SerialTask;
+import io.github.zhztheplayer.velox4j.stateful.NativeCallbackTarget;
import io.github.zhztheplayer.velox4j.stateful.StatefulElement;
import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark;
@@ -50,7 +51,7 @@ import java.util.Map;
* instead of flink RowData.
*/
public class GlutenTwoInputOperator<IN, OUT> extends
AbstractStreamOperator<OUT>
- implements TwoInputStreamOperator<IN, IN, OUT>, GlutenOperator {
+ implements TwoInputStreamOperator<IN, IN, OUT>, GlutenOperator,
NativeCallbackTarget {
private static final Logger LOG =
LoggerFactory.getLogger(GlutenTwoInputOperator.class);
@@ -67,6 +68,7 @@ public class GlutenTwoInputOperator<IN, OUT> extends
AbstractStreamOperator<OUT>
private ExternalStreams.BlockingQueue leftInputQueue;
private ExternalStreams.BlockingQueue rightInputQueue;
private SerialTask task;
+ private transient volatile boolean closing;
private final Class<IN> inClass;
private final Class<OUT> outClass;
private VectorInputBridge<IN> inputBridge;
@@ -117,6 +119,7 @@ public class GlutenTwoInputOperator<IN, OUT> extends
AbstractStreamOperator<OUT>
@Override
public void open() throws Exception {
+ closing = false;
super.open();
if (!mailboxHolder().get().isMailboxBound()) {
ensureMailboxInitialized(getContainingTask());
@@ -136,9 +139,20 @@ public class GlutenTwoInputOperator<IN, OUT> extends
AbstractStreamOperator<OUT>
@Override
public void scheduleProcessElementOnMailbox() {
+ if (closing) {
+ return;
+ }
scheduleDrainOnMailbox(this::drainTaskOutput);
}
+ @Override
+ public void onProcessingTime(long timestamp) {
+ if (closing) {
+ return;
+ }
+ scheduleProcessElementOnMailbox();
+ }
+
@Override
public void processElement1(StreamRecord<IN> element) {
StatefulRecord statefulRecord =
@@ -168,11 +182,20 @@ public class GlutenTwoInputOperator<IN, OUT> extends
AbstractStreamOperator<OUT>
@Override
public void processElementInternal() {
+ if (closing) {
+ return;
+ }
drainOutput(this::drainTaskOutput);
}
private void drainTaskOutput() {
+ if (closing) {
+ return;
+ }
while (true) {
+ if (closing) {
+ return;
+ }
UpIterator.State state = task.advance();
if (state == UpIterator.State.AVAILABLE) {
final StatefulElement element = task.statefulGet();
@@ -213,18 +236,34 @@ public class GlutenTwoInputOperator<IN, OUT> extends
AbstractStreamOperator<OUT>
@Override
public void close() throws Exception {
- if (leftInputQueue != null) {
- leftInputQueue.close();
- }
- if (rightInputQueue != null) {
- rightInputQueue.close();
- }
- if (task != null) {
- task.close();
- }
- if (sessionResource != null) {
- sessionResource.close();
- }
+ closing = true;
+ GlutenCloseables.runWithCleanup(
+ () -> {
+ if (leftInputQueue != null) {
+ leftInputQueue.close();
+ }
+ },
+ () -> {
+ if (rightInputQueue != null) {
+ rightInputQueue.close();
+ }
+ },
+ () -> {
+ if (task != null) {
+ task.unbindNativeCallbackTarget();
+ }
+ },
+ () -> {
+ if (task != null) {
+ task.close();
+ }
+ },
+ () -> {
+ if (sessionResource != null) {
+ sessionResource.close();
+ }
+ },
+ super::close);
}
@Override
@@ -286,7 +325,6 @@ public class GlutenTwoInputOperator<IN, OUT> extends
AbstractStreamOperator<OUT>
sessionResource = new GlutenSessionResource();
GlutenSessionResources.getInstance().addSessionResource(getId(),
sessionResource);
- GlutenSessionResources.getInstance().addOperator(getId(), this);
leftInputQueue =
sessionResource.getSession().externalStreamOps().newBlockingQueue();
rightInputQueue =
sessionResource.getSession().externalStreamOps().newBlockingQueue();
@@ -296,6 +334,7 @@ public class GlutenTwoInputOperator<IN, OUT> extends
AbstractStreamOperator<OUT>
VeloxQueryConfig.getConfig(getRuntimeContext()),
VeloxConnectorConfig.getConfig(getRuntimeContext()));
task = sessionResource.getSession().queryOps().execute(query);
+ task.bindNativeCallbackTarget(this);
ExternalStreamConnectorSplit leftSplit =
new ExternalStreamConnectorSplit("connector-external-stream",
leftInputQueue.id());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]