This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b61de59e37f [FLINK-38773][table] Fix batch vector search excnode
context (#27311)
b61de59e37f is described below
commit b61de59e37f08818d78afc17cf5ff7b6a5198c05
Author: Hao Li <[email protected]>
AuthorDate: Thu Dec 4 03:15:48 2025 -0800
[FLINK-38773][table] Fix batch vector search excnode context (#27311)
---
.../plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java | 5 ++---
.../apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java | 2 ++
.../async-vector-search/plan/async-vector-search.json | 6 +++---
.../sync-vector-search/plan/sync-vector-search.json | 6 +++---
.../plan/vector-search-with-runtime-config.json | 6 +++---
5 files changed, 13 insertions(+), 12 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java
index 6fcbc4ec4b5..204da67ac2e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java
@@ -28,7 +28,6 @@ import
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTransl
import
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecVectorSearchTableFunction;
import org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchSpec;
import
org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchTableSourceSpec;
-import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecVectorSearchTableFunction;
import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
import org.apache.flink.table.types.logical.RowType;
@@ -65,9 +64,9 @@ public class BatchExecVectorSearchTableFunction extends
CommonExecVectorSearchTa
String description) {
this(
ExecNodeContext.newNodeId(),
-
ExecNodeContext.newContext(StreamExecVectorSearchTableFunction.class),
+
ExecNodeContext.newContext(BatchExecVectorSearchTableFunction.class),
ExecNodeContext.newPersistedConfig(
- StreamExecVectorSearchTableFunction.class,
tableConfig),
+ BatchExecVectorSearchTableFunction.class, tableConfig),
tableSourceSpec,
vectorSearchSpec,
asyncOptions,
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index cd24cc9c82b..2aaeb5e8ea0 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -47,6 +47,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortLimit;
import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecUnion;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues;
+import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecVectorSearchTableFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecWindowTableFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCorrelate;
@@ -201,6 +202,7 @@ public final class ExecNodeMetadataUtil {
add(BatchExecMatch.class);
add(BatchExecOverAggregate.class);
add(BatchExecRank.class);
+ add(BatchExecVectorSearchTableFunction.class);
}
};
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
index 1a98b09d924..8d9ba1a54de 100644
---
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
@@ -1,5 +1,5 @@
{
- "flinkVersion" : "2.2",
+ "flinkVersion" : "2.3",
"nodes" : [ {
"id" : 5,
"type" : "batch-exec-table-source-scan_1",
@@ -24,10 +24,10 @@
},
"outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector`
ARRAY<FLOAT>>",
"description" : "TableSourceScan(table=[[default_catalog,
default_database, src_t]], fields=[id, content, vector])",
- "dynamicFilteringDataListenerID" : "581f0987-928f-4c00-a1c1-500fdd1c98fb"
+ "dynamicFilteringDataListenerID" : "7068b1b2-f209-4e4b-8f16-77adc1440cb8"
}, {
"id" : 6,
- "type" : "stream-exec-vector-search-table-function_1",
+ "type" : "batch-exec-vector-search-table-function_1",
"configuration" : {
"table.exec.async-vector-search.max-concurrent-operations" : "10",
"table.exec.async-vector-search.output-mode" : "ORDERED",
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
index 3fea189cde0..f0f6ce74cdc 100644
---
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
@@ -1,5 +1,5 @@
{
- "flinkVersion" : "2.2",
+ "flinkVersion" : "2.3",
"nodes" : [ {
"id" : 1,
"type" : "batch-exec-table-source-scan_1",
@@ -24,10 +24,10 @@
},
"outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector`
ARRAY<FLOAT>>",
"description" : "TableSourceScan(table=[[default_catalog,
default_database, src_t]], fields=[id, content, vector])",
- "dynamicFilteringDataListenerID" : "5e850a71-a459-4906-9822-22c324f6f4c9"
+ "dynamicFilteringDataListenerID" : "19f6b4e7-e167-4785-8b10-4ab9bb286201"
}, {
"id" : 2,
- "type" : "stream-exec-vector-search-table-function_1",
+ "type" : "batch-exec-vector-search-table-function_1",
"configuration" : {
"table.exec.async-vector-search.max-concurrent-operations" : "10",
"table.exec.async-vector-search.output-mode" : "ORDERED",
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json
index 81b762fd38d..1bc02102276 100644
---
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json
@@ -1,5 +1,5 @@
{
- "flinkVersion" : "2.2",
+ "flinkVersion" : "2.3",
"nodes" : [ {
"id" : 9,
"type" : "batch-exec-table-source-scan_1",
@@ -24,10 +24,10 @@
},
"outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector`
ARRAY<FLOAT>>",
"description" : "TableSourceScan(table=[[default_catalog,
default_database, src_t]], fields=[id, content, vector])",
- "dynamicFilteringDataListenerID" : "21fdf54d-fce6-42da-bcd3-a522ebee5cd6"
+ "dynamicFilteringDataListenerID" : "733d0e86-4dba-4d5e-986a-22f1d385c6ef"
}, {
"id" : 10,
- "type" : "stream-exec-vector-search-table-function_1",
+ "type" : "batch-exec-vector-search-table-function_1",
"configuration" : {
"table.exec.async-vector-search.max-concurrent-operations" : "10",
"table.exec.async-vector-search.output-mode" : "ORDERED",