This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 64ce0c28363 [FLINK-38427][table] Support to serde VECTOR_SEARCH exec 
node (#27127)
64ce0c28363 is described below

commit 64ce0c283633fed3715558d59d9de9e411289718
Author: Shengkai <[email protected]>
AuthorDate: Thu Oct 23 15:48:01 2025 +0800

    [FLINK-38427][table] Support to serde VECTOR_SEARCH exec node (#27127)
---
 .../exec/spec/VectorSearchTableSourceSpec.java     |  59 ++++++-
 .../StreamExecVectorSearchTableFunction.java       |  56 ++++++-
 .../planner/plan/utils/ExecNodeMetadataUtil.java   |   2 +-
 .../planner/factories/TestValuesTableFactory.java  |  24 +++
 .../VectorSearchTableSourceSpecSerdeTest.java      | 132 ++++++++++++++++
 .../nodes/exec/stream/VectorSearchRestoreTest.java |  41 +++++
 .../exec/stream/VectorSearchTestPrograms.java      | 107 +++++++++++++
 .../plan/async-vector-search.json                  | 175 +++++++++++++++++++++
 .../async-vector-search/savepoint/_metadata        | Bin 0 -> 10882 bytes
 .../plan/sync-vector-search.json                   | 171 ++++++++++++++++++++
 .../sync-vector-search/savepoint/_metadata         | Bin 0 -> 8385 bytes
 11 files changed, 758 insertions(+), 9 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchTableSourceSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchTableSourceSpec.java
index fd5a032d390..ba3fefd1379 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchTableSourceSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchTableSourceSpec.java
@@ -18,11 +18,24 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.spec;
 
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.type.RelDataType;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 
 /**
@@ -30,11 +43,19 @@ import java.util.Arrays;
  *
  * <p>This class corresponds to {@link RelOptTable} rel node.
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class VectorSearchTableSourceSpec {
 
+    public static final String FIELD_NAME_VECTOR_SEARCH_TABLE_SOURCE = 
"vectorSearchTableSource";
+    public static final String FIELD_NAME_OUTPUT_TYPE = "outputType";
+
+    @JsonProperty(FIELD_NAME_VECTOR_SEARCH_TABLE_SOURCE)
     private final DynamicTableSourceSpec tableSourceSpec;
+
+    @JsonProperty(FIELD_NAME_OUTPUT_TYPE)
     private final RelDataType outputType;
-    private final TableSourceTable searchTable;
+
+    @JsonIgnore private @Nullable TableSourceTable searchTable;
 
     public VectorSearchTableSourceSpec(TableSourceTable searchTable) {
         this.searchTable = searchTable;
@@ -45,8 +66,40 @@ public class VectorSearchTableSourceSpec {
                         Arrays.asList(searchTable.abilitySpecs()));
     }
 
-    public TableSourceTable getSearchTable() {
-        return searchTable;
+    @JsonCreator
+    public VectorSearchTableSourceSpec(
+            @JsonProperty(FIELD_NAME_VECTOR_SEARCH_TABLE_SOURCE)
+                    DynamicTableSourceSpec tableSourceSpec,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RelDataType outputType) {
+        this.tableSourceSpec = tableSourceSpec;
+        this.outputType = outputType;
+    }
+
+    @JsonIgnore
+    public TableSourceTable getSearchTable(FlinkContext context, 
FlinkTypeFactory typeFactory) {
+        if (null != searchTable) {
+            return searchTable;
+        }
+        if (null != tableSourceSpec && null != outputType) {
+            VectorSearchTableSource vectorSearchTableSource =
+                    tableSourceSpec.getVectorSearchTableSource(context, 
typeFactory);
+            SourceAbilitySpec[] sourceAbilitySpecs = null;
+            if (null != tableSourceSpec.getSourceAbilities()) {
+                sourceAbilitySpecs =
+                        tableSourceSpec.getSourceAbilities().toArray(new 
SourceAbilitySpec[0]);
+            }
+            return new TableSourceTable(
+                    null,
+                    outputType,
+                    FlinkStatistic.UNKNOWN(),
+                    vectorSearchTableSource,
+                    true,
+                    tableSourceSpec.getContextResolvedTable(),
+                    context,
+                    typeFactory,
+                    sourceAbilitySpecs);
+        }
+        throw new TableException("Can not obtain searchTable correctly!");
     }
 
     public DynamicTableSourceSpec getTableSourceSpec() {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
index c0072e67f9e..a411452c099 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.PipelineOptions;
@@ -47,6 +48,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchSpec;
@@ -65,6 +67,9 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.core.JoinRelType;
 
@@ -72,33 +77,73 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 /** Stream {@link ExecNode} for {@code VECTOR_SEARCH}. */
+@ExecNodeMetadata(
+        name = "stream-exec-vector-search-table-function",
+        version = 1,
+        consumedOptions = {
+            "table.exec.async-vector-search.max-concurrent-operations",
+            "table.exec.async-vector-search.timeout",
+            "table.exec.async-vector-search.output-mode"
+        },
+        producedTransformations = 
StreamExecMLPredictTableFunction.ML_PREDICT_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v2_2,
+        minStateVersion = FlinkVersion.v2_2)
 public class StreamExecVectorSearchTableFunction extends ExecNodeBase<RowData>
         implements MultipleTransformationTranslator<RowData>, 
StreamExecNode<RowData> {
 
     public static final String VECTOR_SEARCH_TRANSFORMATION = 
"vector-search-table-function";
-    private final VectorSearchTableSourceSpec vectorSearchTableSourceSpec;
+
+    private static final String FIELD_NAME_TABLE_SOURCE_SPEC = 
"tableSourceSpec";
+    private static final String FIELD_NAME_VECTOR_SEARCH_SPEC = 
"vectorSearchSpec";
+    private static final String FIELD_NAME_ASYNC_OPTIONS = "asyncOptions";
+
+    @JsonProperty(FIELD_NAME_TABLE_SOURCE_SPEC)
+    private final VectorSearchTableSourceSpec tableSourceSpec;
+
+    @JsonProperty(FIELD_NAME_VECTOR_SEARCH_SPEC)
     private final VectorSearchSpec vectorSearchSpec;
+
+    @JsonProperty(FIELD_NAME_ASYNC_OPTIONS)
     private final @Nullable FunctionCallUtil.AsyncOptions asyncOptions;
 
     public StreamExecVectorSearchTableFunction(
             ReadableConfig tableConfig,
-            VectorSearchTableSourceSpec vectorSearchTableSourceSpec,
+            VectorSearchTableSourceSpec tableSourceSpec,
             VectorSearchSpec vectorSearchSpec,
             @Nullable FunctionCallUtil.AsyncOptions asyncOptions,
             InputProperty inputProperty,
             RowType outputType,
             String description) {
-        super(
+        this(
                 ExecNodeContext.newNodeId(),
                 
ExecNodeContext.newContext(StreamExecVectorSearchTableFunction.class),
                 ExecNodeContext.newPersistedConfig(
                         StreamExecVectorSearchTableFunction.class, 
tableConfig),
+                tableSourceSpec,
+                vectorSearchSpec,
+                asyncOptions,
                 Collections.singletonList(inputProperty),
                 outputType,
                 description);
-        this.vectorSearchTableSourceSpec = vectorSearchTableSourceSpec;
+    }
+
+    @JsonCreator
+    public StreamExecVectorSearchTableFunction(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_TABLE_SOURCE_SPEC) 
VectorSearchTableSourceSpec tableSourceSpec,
+            @JsonProperty(FIELD_NAME_VECTOR_SEARCH_SPEC) VectorSearchSpec 
vectorSearchSpec,
+            @JsonProperty(FIELD_NAME_ASYNC_OPTIONS) @Nullable
+                    FunctionCallUtil.AsyncOptions asyncOptions,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(id, context, persistedConfig, inputProperties, outputType, 
description);
+        this.tableSourceSpec = tableSourceSpec;
         this.vectorSearchSpec = vectorSearchSpec;
         this.asyncOptions = asyncOptions;
     }
@@ -111,7 +156,8 @@ public class StreamExecVectorSearchTableFunction extends 
ExecNodeBase<RowData>
         Transformation<RowData> inputTransformation =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         // 2. extract search function
-        TableSourceTable searchTable = 
vectorSearchTableSourceSpec.getSearchTable();
+        TableSourceTable searchTable =
+                tableSourceSpec.getSearchTable(planner.getFlinkContext(), 
planner.getTypeFactory());
         boolean isAsyncEnabled = asyncOptions != null;
         UserDefinedFunction vectorSearchFunction =
                 findVectorSearchFunction(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index f860d7104cc..cd24cc9c82b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -179,6 +179,7 @@ public final class ExecNodeMetadataUtil {
                     add(StreamExecPythonOverAggregate.class);
                     add(StreamExecMLPredictTableFunction.class);
                     add(StreamExecDeltaJoin.class);
+                    add(StreamExecVectorSearchTableFunction.class);
                     // Batch execution mode
                     add(BatchExecSink.class);
                     add(BatchExecTableSourceScan.class);
@@ -222,7 +223,6 @@ public final class ExecNodeMetadataUtil {
                     add(StreamExecGroupTableAggregate.class);
                     add(StreamExecPythonGroupTableAggregate.class);
                     add(StreamExecMultipleInput.class);
-                    add(StreamExecVectorSearchTableFunction.class);
                 }
             };
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index cebde11ac66..79387ab9285 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -2215,6 +2215,30 @@ public final class TestValuesTableFactory
         }
     }
 
+    /** A mocked {@link VectorSearchTableSource} for validation test. */
+    public static class MockedVectorSearchTableSource implements 
VectorSearchTableSource {
+        @Override
+        public VectorSearchRuntimeProvider 
getSearchRuntimeProvider(VectorSearchContext context) {
+            return VectorSearchFunctionProvider.of(
+                    new VectorSearchFunction() {
+                        @Override
+                        public Collection<RowData> vectorSearch(int topK, 
RowData queryData) {
+                            return Collections.emptyList();
+                        }
+                    });
+        }
+
+        @Override
+        public DynamicTableSource copy() {
+            throw new UnsupportedOperationException("Not implemented.");
+        }
+
+        @Override
+        public String asSummaryString() {
+            return "MockedVectorSearchSource";
+        }
+    }
+
     /**
      * Values {@link ScanTableSource} which collects the registered {@link 
RowData} directly, sleeps
      * {@link #sleepTimeMillis} every {@link #sleepAfterElements} elements.
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java
new file mode 100644
index 00000000000..8604882b7de
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.plan.abilities.source.LimitPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import 
org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchTableSourceSpec;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
+
+/** Tests for {@link VectorSearchTableSourceSpec} serialization and 
deserialization. */
+@Execution(CONCURRENT)
+public class VectorSearchTableSourceSpecSerdeTest {
+
+    private static final FlinkTypeFactory FACTORY =
+            new FlinkTypeFactory(
+                    TemporalTableSourceSpecSerdeTest.class.getClassLoader(),
+                    FlinkTypeSystem.INSTANCE);
+
+    private static final FlinkContext FLINK_CONTEXT =
+            JsonSerdeTestUtil.configuredSerdeContext().getFlinkContext();
+
+    public static Stream<VectorSearchTableSourceSpec> 
testVectorSearchTableSourceSpecSerde() {
+
+        Map<String, String> options1 = new HashMap<>();
+        options1.put("connector", "filesystem");
+        options1.put("format", "testcsv");
+        options1.put("path", "/tmp");
+
+        final ResolvedSchema resolvedSchema1 =
+                new ResolvedSchema(
+                        Collections.singletonList(Column.physical("a", 
DataTypes.BIGINT())),
+                        Collections.emptyList(),
+                        null,
+                        Collections.emptyList());
+
+        final CatalogTable catalogTable1 =
+                CatalogTable.newBuilder()
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema1).build())
+                        .options(options1)
+                        .build();
+
+        ResolvedCatalogTable resolvedCatalogTable =
+                new ResolvedCatalogTable(catalogTable1, resolvedSchema1);
+
+        RelDataType relDataType1 = FACTORY.createSqlType(SqlTypeName.BIGINT);
+
+        TableSourceTable tableSourceTable1 =
+                new TableSourceTable(
+                        null,
+                        relDataType1,
+                        FlinkStatistic.UNKNOWN(),
+                        new 
TestValuesTableFactory.MockedVectorSearchTableSource(),
+                        true,
+                        ContextResolvedTable.temporary(
+                                ObjectIdentifier.of("default_catalog", 
"default_db", "MyTable"),
+                                resolvedCatalogTable),
+                        FLINK_CONTEXT,
+                        FACTORY,
+                        new SourceAbilitySpec[] {new LimitPushDownSpec(100)});
+        return Stream.of(new VectorSearchTableSourceSpec(tableSourceTable1));
+    }
+
+    @ParameterizedTest
+    @MethodSource("testVectorSearchTableSourceSpecSerde")
+    public void testTemporalTableSourceSpecSerde(VectorSearchTableSourceSpec 
spec)
+            throws IOException {
+        CatalogManager catalogManager = 
CatalogManagerMocks.createEmptyCatalogManager();
+        catalogManager.createTemporaryTable(
+                
spec.getTableSourceSpec().getContextResolvedTable().getResolvedTable(),
+                
spec.getTableSourceSpec().getContextResolvedTable().getIdentifier(),
+                false);
+
+        SerdeContext serdeCtx =
+                JsonSerdeTestUtil.configuredSerdeContext(catalogManager, 
TableConfig.getDefault());
+
+        String json = JsonSerdeTestUtil.toJson(serdeCtx, spec);
+        VectorSearchTableSourceSpec actual =
+                JsonSerdeTestUtil.toObject(serdeCtx, json, 
VectorSearchTableSourceSpec.class);
+        assertThat(actual.getTableSourceSpec().getContextResolvedTable())
+                
.isEqualTo(spec.getTableSourceSpec().getContextResolvedTable());
+        assertThat(actual.getTableSourceSpec().getSourceAbilities())
+                .isEqualTo(spec.getTableSourceSpec().getSourceAbilities());
+        assertThat(actual.getOutputType()).isEqualTo(spec.getOutputType());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchRestoreTest.java
new file mode 100644
index 00000000000..937d27baecc
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchRestoreTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.VectorSearchTestPrograms.ASYNC_VECTOR_SEARCH;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.VectorSearchTestPrograms.SYNC_VECTOR_SEARCH;
+
+/** Restore tests for {@link StreamExecVectorSearchTableFunction}. */
+public class VectorSearchRestoreTest extends RestoreTestBase {
+
+    public VectorSearchRestoreTest() {
+        super(StreamExecVectorSearchTableFunction.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(SYNC_VECTOR_SEARCH, ASYNC_VECTOR_SEARCH);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchTestPrograms.java
new file mode 100644
index 00000000000..34265722782
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchTestPrograms.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecMLPredictTableFunction}. */
+public class VectorSearchTestPrograms {
+
+    static final String[] SOURCE_SCHEMA =
+            new String[] {"id BIGINT", "content STRING", "vector 
ARRAY<FLOAT>"};
+
+    static final Row[] INPUT_BEFORE_DATA =
+            new Row[] {Row.of(1L, "Spark", new Float[] {5f, 12f, 13f})};
+
+    static final Row[] INPUT_AFTER_DATA =
+            new Row[] {Row.of(2L, "Flink", new Float[] {-5f, -12f, -13f})};
+
+    static final SourceTestStep SOURCE_TABLE =
+            SourceTestStep.newBuilder("src_t")
+                    .addSchema(SOURCE_SCHEMA)
+                    .producedBeforeRestore(INPUT_BEFORE_DATA)
+                    .producedAfterRestore(INPUT_AFTER_DATA)
+                    .build();
+
+    // 
-------------------------------------------------------------------------------------------
+
+    static final String[] VECTOR_TABLE_SCHEMA =
+            new String[] {"label STRING", "vector ARRAY<FLOAT>"};
+
+    static final Row[] VECTOR_TABLE_DATA =
+            new Row[] {
+                Row.of("Batch", new Float[] {5f, 12f, 13f}),
+                Row.of("Streaming", new Float[] {-5f, -12f, -13f}),
+                Row.of("Big Data", new Float[] {1f, 1f, 0f})
+            };
+
+    static final SourceTestStep ASYNC_VECTOR_TABLE =
+            SourceTestStep.newBuilder("async_vector_table")
+                    .addSchema(VECTOR_TABLE_SCHEMA)
+                    .addOption("async", "true")
+                    .addOption("enable-vector-search", "true")
+                    .producedBeforeRestore(VECTOR_TABLE_DATA)
+                    .producedAfterRestore(VECTOR_TABLE_DATA)
+                    .build();
+
+    static final SourceTestStep SYNC_VECTOR_TABLE =
+            SourceTestStep.newBuilder("sync_vector_table")
+                    .addSchema(VECTOR_TABLE_SCHEMA)
+                    .addOption("enable-vector-search", "true")
+                    .producedBeforeRestore(VECTOR_TABLE_DATA)
+                    .producedAfterRestore(VECTOR_TABLE_DATA)
+                    .build();
+
+    // 
-------------------------------------------------------------------------------------------
+
+    static final String[] SINK_SCHEMA =
+            new String[] {"id BIGINT", "content STRING", "label STRING"};
+
+    static final SinkTestStep SINK_TABLE =
+            SinkTestStep.newBuilder("sink_t")
+                    .addSchema(SINK_SCHEMA)
+                    .consumedBeforeRestore("+I[1, Spark, Batch]", "+I[1, 
Spark, Big Data]")
+                    .consumedAfterRestore("+I[2, Flink, Streaming]", "+I[2, 
Flink, Big Data]")
+                    .build();
+
+    // 
-------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram SYNC_VECTOR_SEARCH =
+            TableTestProgram.of("sync-vector-search", "vector search in sync 
mode.")
+                    .setupTableSource(SOURCE_TABLE)
+                    .setupTableSource(SYNC_VECTOR_TABLE)
+                    .setupTableSink(SINK_TABLE)
+                    .runSql(
+                            "INSERT INTO sink_t SELECT id, content, label FROM 
src_t, LATERAL TABLE(\n"
+                                    + "VECTOR_SEARCH(TABLE sync_vector_table, 
DESCRIPTOR(vector), src_t.vector, 2))")
+                    .build();
+
+    public static final TableTestProgram ASYNC_VECTOR_SEARCH =
+            TableTestProgram.of("async-vector-search", "vector search in async 
mode.")
+                    .setupTableSource(SOURCE_TABLE)
+                    .setupTableSource(ASYNC_VECTOR_TABLE)
+                    .setupTableSink(SINK_TABLE)
+                    .runSql(
+                            "INSERT INTO sink_t SELECT id, content, label FROM 
src_t, LATERAL TABLE(\n"
+                                    + "VECTOR_SEARCH(TABLE async_vector_table, 
DESCRIPTOR(vector), src_t.vector, 2))")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
new file mode 100644
index 00000000000..31c0de74713
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
@@ -0,0 +1,175 @@
+{
+  "flinkVersion" : "2.2",
+  "nodes" : [ {
+    "id" : 5,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`src_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "content",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "vector",
+              "dataType" : "ARRAY<FLOAT>"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` 
ARRAY<FLOAT>>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, src_t]], fields=[id, content, vector])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-vector-search-table-function_1",
+    "configuration" : {
+      "table.exec.async-vector-search.max-concurrent-operations" : "10",
+      "table.exec.async-vector-search.output-mode" : "ORDERED",
+      "table.exec.async-vector-search.timeout" : "3 min"
+    },
+    "tableSourceSpec" : {
+      "vectorSearchTableSource" : {
+        "table" : {
+          "identifier" : 
"`default_catalog`.`default_database`.`async_vector_table`",
+          "resolvedTable" : {
+            "schema" : {
+              "columns" : [ {
+                "name" : "label",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "vector",
+                "dataType" : "ARRAY<FLOAT>"
+              } ]
+            }
+          }
+        }
+      },
+      "outputType" : "ROW<`label` VARCHAR(2147483647), `vector` ARRAY<FLOAT>> 
NOT NULL"
+    },
+    "vectorSearchSpec" : {
+      "joinType" : "INNER",
+      "searchColumns" : {
+        "1" : {
+          "type" : "FieldRef",
+          "index" : 2
+        }
+      },
+      "topK" : {
+        "type" : "Constant",
+        "sourceType" : "INT NOT NULL",
+        "literal" : {
+          "kind" : "LITERAL",
+          "value" : 2,
+          "type" : "INT NOT NULL"
+        }
+      },
+      "outputType" : "ROW<`label` VARCHAR(2147483647), `vector` ARRAY<FLOAT>, 
`score` DOUBLE NOT NULL>"
+    },
+    "asyncOptions" : {
+      "capacity " : 10,
+      "timeout" : 180000,
+      "output-mode" : "ORDERED"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` 
ARRAY<FLOAT>, `label` VARCHAR(2147483647), `vector0` ARRAY<FLOAT>, `score` 
DOUBLE NOT NULL>",
+    "description" : 
"VectorSearchTableFunction(table=[default_catalog.default_database.async_vector_table],
 joinType=[InnerJoin], columnToSearch=[vector], columnToQuery=[vector], 
topK=[2], select=[id, content, vector, label, vector, score])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `label` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[id, content, label])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "content",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "label",
+              "dataType" : "VARCHAR(2147483647)"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `label` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[id, content, label])"
+  } ],
+  "edges" : [ {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/async-vector-search/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/async-vector-search/savepoint/_metadata
new file mode 100644
index 00000000000..d4ef91b4218
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/async-vector-search/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
new file mode 100644
index 00000000000..82d63664594
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
@@ -0,0 +1,171 @@
+{
+  "flinkVersion" : "2.2",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`src_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "content",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "vector",
+              "dataType" : "ARRAY<FLOAT>"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` 
ARRAY<FLOAT>>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, src_t]], fields=[id, content, vector])"
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-vector-search-table-function_1",
+    "configuration" : {
+      "table.exec.async-vector-search.max-concurrent-operations" : "10",
+      "table.exec.async-vector-search.output-mode" : "ORDERED",
+      "table.exec.async-vector-search.timeout" : "3 min"
+    },
+    "tableSourceSpec" : {
+      "vectorSearchTableSource" : {
+        "table" : {
+          "identifier" : 
"`default_catalog`.`default_database`.`sync_vector_table`",
+          "resolvedTable" : {
+            "schema" : {
+              "columns" : [ {
+                "name" : "label",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "vector",
+                "dataType" : "ARRAY<FLOAT>"
+              } ]
+            }
+          }
+        }
+      },
+      "outputType" : "ROW<`label` VARCHAR(2147483647), `vector` ARRAY<FLOAT>> 
NOT NULL"
+    },
+    "vectorSearchSpec" : {
+      "joinType" : "INNER",
+      "searchColumns" : {
+        "1" : {
+          "type" : "FieldRef",
+          "index" : 2
+        }
+      },
+      "topK" : {
+        "type" : "Constant",
+        "sourceType" : "INT NOT NULL",
+        "literal" : {
+          "kind" : "LITERAL",
+          "value" : 2,
+          "type" : "INT NOT NULL"
+        }
+      },
+      "outputType" : "ROW<`label` VARCHAR(2147483647), `vector` ARRAY<FLOAT>, 
`score` DOUBLE NOT NULL>"
+    },
+    "asyncOptions" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` 
ARRAY<FLOAT>, `label` VARCHAR(2147483647), `vector0` ARRAY<FLOAT>, `score` 
DOUBLE NOT NULL>",
+    "description" : 
"VectorSearchTableFunction(table=[default_catalog.default_database.sync_vector_table],
 joinType=[InnerJoin], columnToSearch=[vector], columnToQuery=[vector], 
topK=[2], select=[id, content, vector, label, vector, score])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `label` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[id, content, label])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "content",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "label",
+              "dataType" : "VARCHAR(2147483647)"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `label` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[id, content, label])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/sync-vector-search/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/sync-vector-search/savepoint/_metadata
new file mode 100644
index 00000000000..7e661e5387e
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/sync-vector-search/savepoint/_metadata
 differ


Reply via email to