This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b61de59e37f [FLINK-38773][table] Fix batch vector search excnode 
context (#27311)
b61de59e37f is described below

commit b61de59e37f08818d78afc17cf5ff7b6a5198c05
Author: Hao Li <[email protected]>
AuthorDate: Thu Dec 4 03:15:48 2025 -0800

    [FLINK-38773][table] Fix batch vector search excnode context (#27311)
---
 .../plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java   | 5 ++---
 .../apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java | 2 ++
 .../async-vector-search/plan/async-vector-search.json               | 6 +++---
 .../sync-vector-search/plan/sync-vector-search.json                 | 6 +++---
 .../plan/vector-search-with-runtime-config.json                     | 6 +++---
 5 files changed, 13 insertions(+), 12 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java
index 6fcbc4ec4b5..204da67ac2e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTransl
 import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecVectorSearchTableFunction;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchSpec;
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchTableSourceSpec;
-import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecVectorSearchTableFunction;
 import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -65,9 +64,9 @@ public class BatchExecVectorSearchTableFunction extends 
CommonExecVectorSearchTa
             String description) {
         this(
                 ExecNodeContext.newNodeId(),
-                
ExecNodeContext.newContext(StreamExecVectorSearchTableFunction.class),
+                
ExecNodeContext.newContext(BatchExecVectorSearchTableFunction.class),
                 ExecNodeContext.newPersistedConfig(
-                        StreamExecVectorSearchTableFunction.class, 
tableConfig),
+                        BatchExecVectorSearchTableFunction.class, tableConfig),
                 tableSourceSpec,
                 vectorSearchSpec,
                 asyncOptions,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index cd24cc9c82b..2aaeb5e8ea0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -47,6 +47,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortLimit;
 import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecUnion;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues;
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecVectorSearchTableFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecWindowTableFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCorrelate;
@@ -201,6 +202,7 @@ public final class ExecNodeMetadataUtil {
                     add(BatchExecMatch.class);
                     add(BatchExecOverAggregate.class);
                     add(BatchExecRank.class);
+                    add(BatchExecVectorSearchTableFunction.class);
                 }
             };
 
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
index 1a98b09d924..8d9ba1a54de 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
@@ -1,5 +1,5 @@
 {
-  "flinkVersion" : "2.2",
+  "flinkVersion" : "2.3",
   "nodes" : [ {
     "id" : 5,
     "type" : "batch-exec-table-source-scan_1",
@@ -24,10 +24,10 @@
     },
     "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` 
ARRAY<FLOAT>>",
     "description" : "TableSourceScan(table=[[default_catalog, 
default_database, src_t]], fields=[id, content, vector])",
-    "dynamicFilteringDataListenerID" : "581f0987-928f-4c00-a1c1-500fdd1c98fb"
+    "dynamicFilteringDataListenerID" : "7068b1b2-f209-4e4b-8f16-77adc1440cb8"
   }, {
     "id" : 6,
-    "type" : "stream-exec-vector-search-table-function_1",
+    "type" : "batch-exec-vector-search-table-function_1",
     "configuration" : {
       "table.exec.async-vector-search.max-concurrent-operations" : "10",
       "table.exec.async-vector-search.output-mode" : "ORDERED",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
index 3fea189cde0..f0f6ce74cdc 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
@@ -1,5 +1,5 @@
 {
-  "flinkVersion" : "2.2",
+  "flinkVersion" : "2.3",
   "nodes" : [ {
     "id" : 1,
     "type" : "batch-exec-table-source-scan_1",
@@ -24,10 +24,10 @@
     },
     "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` 
ARRAY<FLOAT>>",
     "description" : "TableSourceScan(table=[[default_catalog, 
default_database, src_t]], fields=[id, content, vector])",
-    "dynamicFilteringDataListenerID" : "5e850a71-a459-4906-9822-22c324f6f4c9"
+    "dynamicFilteringDataListenerID" : "19f6b4e7-e167-4785-8b10-4ab9bb286201"
   }, {
     "id" : 2,
-    "type" : "stream-exec-vector-search-table-function_1",
+    "type" : "batch-exec-vector-search-table-function_1",
     "configuration" : {
       "table.exec.async-vector-search.max-concurrent-operations" : "10",
       "table.exec.async-vector-search.output-mode" : "ORDERED",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json
index 81b762fd38d..1bc02102276 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json
@@ -1,5 +1,5 @@
 {
-  "flinkVersion" : "2.2",
+  "flinkVersion" : "2.3",
   "nodes" : [ {
     "id" : 9,
     "type" : "batch-exec-table-source-scan_1",
@@ -24,10 +24,10 @@
     },
     "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` 
ARRAY<FLOAT>>",
     "description" : "TableSourceScan(table=[[default_catalog, 
default_database, src_t]], fields=[id, content, vector])",
-    "dynamicFilteringDataListenerID" : "21fdf54d-fce6-42da-bcd3-a522ebee5cd6"
+    "dynamicFilteringDataListenerID" : "733d0e86-4dba-4d5e-986a-22f1d385c6ef"
   }, {
     "id" : 10,
-    "type" : "stream-exec-vector-search-table-function_1",
+    "type" : "batch-exec-vector-search-table-function_1",
     "configuration" : {
       "table.exec.async-vector-search.max-concurrent-operations" : "10",
       "table.exec.async-vector-search.output-mode" : "ORDERED",

Reply via email to