This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch release-2.2 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8de272c7d9627b77d9a9b5693e5b1dff6ee4ddb0 Author: Hao Li <[email protected]> AuthorDate: Thu Dec 4 03:15:48 2025 -0800 [FLINK-38773][table] Fix batch vector search excnode context (#27311) --- .../plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java | 5 ++--- .../apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java | 2 ++ .../async-vector-search/plan/async-vector-search.json | 4 ++-- .../sync-vector-search/plan/sync-vector-search.json | 4 ++-- .../plan/vector-search-with-runtime-config.json | 4 ++-- 5 files changed, 10 insertions(+), 9 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java index 6fcbc4ec4b5..204da67ac2e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecVectorSearchTableFunction.java @@ -28,7 +28,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTransl import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecVectorSearchTableFunction; import org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchSpec; import org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchTableSourceSpec; -import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecVectorSearchTableFunction; import org.apache.flink.table.planner.plan.utils.FunctionCallUtil; import org.apache.flink.table.types.logical.RowType; @@ -65,9 +64,9 @@ public class BatchExecVectorSearchTableFunction extends CommonExecVectorSearchTa String description) { this( ExecNodeContext.newNodeId(), - ExecNodeContext.newContext(StreamExecVectorSearchTableFunction.class), + ExecNodeContext.newContext(BatchExecVectorSearchTableFunction.class), ExecNodeContext.newPersistedConfig( - StreamExecVectorSearchTableFunction.class, tableConfig), + BatchExecVectorSearchTableFunction.class, tableConfig), tableSourceSpec, vectorSearchSpec, asyncOptions, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java index cd24cc9c82b..2aaeb5e8ea0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java @@ -47,6 +47,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortLimit; import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan; import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecUnion; import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues; +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecVectorSearchTableFunction; import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecWindowTableFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCorrelate; @@ -201,6 +202,7 @@ public final class ExecNodeMetadataUtil { add(BatchExecMatch.class); add(BatchExecOverAggregate.class); add(BatchExecRank.class); + add(BatchExecVectorSearchTableFunction.class); } }; diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json index 1a98b09d924..840a4850a95 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json @@ -24,10 +24,10 @@ }, "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` ARRAY<FLOAT>>", "description" : "TableSourceScan(table=[[default_catalog, default_database, src_t]], fields=[id, content, vector])", - "dynamicFilteringDataListenerID" : "581f0987-928f-4c00-a1c1-500fdd1c98fb" + "dynamicFilteringDataListenerID" : "1753d1db-2d8a-4ca8-9d85-18ec5670a027" }, { "id" : 6, - "type" : "stream-exec-vector-search-table-function_1", + "type" : "batch-exec-vector-search-table-function_1", "configuration" : { "table.exec.async-vector-search.max-concurrent-operations" : "10", "table.exec.async-vector-search.output-mode" : "ORDERED", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json index 3fea189cde0..2382525338c 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json @@ -24,10 +24,10 @@ }, "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` ARRAY<FLOAT>>", "description" : "TableSourceScan(table=[[default_catalog, default_database, src_t]], fields=[id, content, vector])", - "dynamicFilteringDataListenerID" : "5e850a71-a459-4906-9822-22c324f6f4c9" + "dynamicFilteringDataListenerID" : "6bd08b9c-a3ed-4e52-8349-babb1e553ced" }, { "id" : 2, - "type" : "stream-exec-vector-search-table-function_1", + "type" : "batch-exec-vector-search-table-function_1", "configuration" : { "table.exec.async-vector-search.max-concurrent-operations" : "10", "table.exec.async-vector-search.output-mode" : "ORDERED", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json index 81b762fd38d..3940bdda183 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json @@ -24,10 +24,10 @@ }, "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` ARRAY<FLOAT>>", "description" : "TableSourceScan(table=[[default_catalog, default_database, src_t]], fields=[id, content, vector])", - "dynamicFilteringDataListenerID" : "21fdf54d-fce6-42da-bcd3-a522ebee5cd6" + "dynamicFilteringDataListenerID" : "24c9df65-8b48-4377-a315-b6bbff3e6333" }, { "id" : 10, - "type" : "stream-exec-vector-search-table-function_1", + "type" : "batch-exec-vector-search-table-function_1", "configuration" : { "table.exec.async-vector-search.max-concurrent-operations" : "10", "table.exec.async-vector-search.output-mode" : "ORDERED",
