This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 64ce0c28363 [FLINK-38427][table] Support to serde VECTOR_SEARCH exec
node (#27127)
64ce0c28363 is described below
commit 64ce0c283633fed3715558d59d9de9e411289718
Author: Shengkai <[email protected]>
AuthorDate: Thu Oct 23 15:48:01 2025 +0800
[FLINK-38427][table] Support to serde VECTOR_SEARCH exec node (#27127)
---
.../exec/spec/VectorSearchTableSourceSpec.java | 59 ++++++-
.../StreamExecVectorSearchTableFunction.java | 56 ++++++-
.../planner/plan/utils/ExecNodeMetadataUtil.java | 2 +-
.../planner/factories/TestValuesTableFactory.java | 24 +++
.../VectorSearchTableSourceSpecSerdeTest.java | 132 ++++++++++++++++
.../nodes/exec/stream/VectorSearchRestoreTest.java | 41 +++++
.../exec/stream/VectorSearchTestPrograms.java | 107 +++++++++++++
.../plan/async-vector-search.json | 175 +++++++++++++++++++++
.../async-vector-search/savepoint/_metadata | Bin 0 -> 10882 bytes
.../plan/sync-vector-search.json | 171 ++++++++++++++++++++
.../sync-vector-search/savepoint/_metadata | Bin 0 -> 8385 bytes
11 files changed, 758 insertions(+), 9 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchTableSourceSpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchTableSourceSpec.java
index fd5a032d390..ba3fefd1379 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchTableSourceSpec.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchTableSourceSpec.java
@@ -18,11 +18,24 @@
package org.apache.flink.table.planner.plan.nodes.exec.spec;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.type.RelDataType;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
/**
@@ -30,11 +43,19 @@ import java.util.Arrays;
*
* <p>This class corresponds to {@link RelOptTable} rel node.
*/
+@JsonIgnoreProperties(ignoreUnknown = true)
public class VectorSearchTableSourceSpec {
+ public static final String FIELD_NAME_VECTOR_SEARCH_TABLE_SOURCE =
"vectorSearchTableSource";
+ public static final String FIELD_NAME_OUTPUT_TYPE = "outputType";
+
+ @JsonProperty(FIELD_NAME_VECTOR_SEARCH_TABLE_SOURCE)
private final DynamicTableSourceSpec tableSourceSpec;
+
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE)
private final RelDataType outputType;
- private final TableSourceTable searchTable;
+
+ @JsonIgnore private @Nullable TableSourceTable searchTable;
public VectorSearchTableSourceSpec(TableSourceTable searchTable) {
this.searchTable = searchTable;
@@ -45,8 +66,40 @@ public class VectorSearchTableSourceSpec {
Arrays.asList(searchTable.abilitySpecs()));
}
- public TableSourceTable getSearchTable() {
- return searchTable;
+ @JsonCreator
+ public VectorSearchTableSourceSpec(
+ @JsonProperty(FIELD_NAME_VECTOR_SEARCH_TABLE_SOURCE)
+ DynamicTableSourceSpec tableSourceSpec,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RelDataType outputType) {
+ this.tableSourceSpec = tableSourceSpec;
+ this.outputType = outputType;
+ }
+
+ @JsonIgnore
+ public TableSourceTable getSearchTable(FlinkContext context,
FlinkTypeFactory typeFactory) {
+ if (null != searchTable) {
+ return searchTable;
+ }
+ if (null != tableSourceSpec && null != outputType) {
+ VectorSearchTableSource vectorSearchTableSource =
+ tableSourceSpec.getVectorSearchTableSource(context,
typeFactory);
+ SourceAbilitySpec[] sourceAbilitySpecs = null;
+ if (null != tableSourceSpec.getSourceAbilities()) {
+ sourceAbilitySpecs =
+ tableSourceSpec.getSourceAbilities().toArray(new
SourceAbilitySpec[0]);
+ }
+ return new TableSourceTable(
+ null,
+ outputType,
+ FlinkStatistic.UNKNOWN(),
+ vectorSearchTableSource,
+ true,
+ tableSourceSpec.getContextResolvedTable(),
+ context,
+ typeFactory,
+ sourceAbilitySpecs);
+ }
+ throw new TableException("Can not obtain searchTable correctly!");
}
public DynamicTableSourceSpec getTableSourceSpec() {
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
index c0072e67f9e..a411452c099 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.PipelineOptions;
@@ -47,6 +48,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchSpec;
@@ -65,6 +67,9 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.core.JoinRelType;
@@ -72,33 +77,73 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
/** Stream {@link ExecNode} for {@code VECTOR_SEARCH}. */
+@ExecNodeMetadata(
+ name = "stream-exec-vector-search-table-function",
+ version = 1,
+ consumedOptions = {
+ "table.exec.async-vector-search.max-concurrent-operations",
+ "table.exec.async-vector-search.timeout",
+ "table.exec.async-vector-search.output-mode"
+ },
+ producedTransformations =
StreamExecMLPredictTableFunction.ML_PREDICT_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v2_2,
+ minStateVersion = FlinkVersion.v2_2)
public class StreamExecVectorSearchTableFunction extends ExecNodeBase<RowData>
implements MultipleTransformationTranslator<RowData>,
StreamExecNode<RowData> {
public static final String VECTOR_SEARCH_TRANSFORMATION =
"vector-search-table-function";
- private final VectorSearchTableSourceSpec vectorSearchTableSourceSpec;
+
+ private static final String FIELD_NAME_TABLE_SOURCE_SPEC =
"tableSourceSpec";
+ private static final String FIELD_NAME_VECTOR_SEARCH_SPEC =
"vectorSearchSpec";
+ private static final String FIELD_NAME_ASYNC_OPTIONS = "asyncOptions";
+
+ @JsonProperty(FIELD_NAME_TABLE_SOURCE_SPEC)
+ private final VectorSearchTableSourceSpec tableSourceSpec;
+
+ @JsonProperty(FIELD_NAME_VECTOR_SEARCH_SPEC)
private final VectorSearchSpec vectorSearchSpec;
+
+ @JsonProperty(FIELD_NAME_ASYNC_OPTIONS)
private final @Nullable FunctionCallUtil.AsyncOptions asyncOptions;
public StreamExecVectorSearchTableFunction(
ReadableConfig tableConfig,
- VectorSearchTableSourceSpec vectorSearchTableSourceSpec,
+ VectorSearchTableSourceSpec tableSourceSpec,
VectorSearchSpec vectorSearchSpec,
@Nullable FunctionCallUtil.AsyncOptions asyncOptions,
InputProperty inputProperty,
RowType outputType,
String description) {
- super(
+ this(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(StreamExecVectorSearchTableFunction.class),
ExecNodeContext.newPersistedConfig(
StreamExecVectorSearchTableFunction.class,
tableConfig),
+ tableSourceSpec,
+ vectorSearchSpec,
+ asyncOptions,
Collections.singletonList(inputProperty),
outputType,
description);
- this.vectorSearchTableSourceSpec = vectorSearchTableSourceSpec;
+ }
+
+ @JsonCreator
+ public StreamExecVectorSearchTableFunction(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
persistedConfig,
+ @JsonProperty(FIELD_NAME_TABLE_SOURCE_SPEC)
VectorSearchTableSourceSpec tableSourceSpec,
+ @JsonProperty(FIELD_NAME_VECTOR_SEARCH_SPEC) VectorSearchSpec
vectorSearchSpec,
+ @JsonProperty(FIELD_NAME_ASYNC_OPTIONS) @Nullable
+ FunctionCallUtil.AsyncOptions asyncOptions,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(id, context, persistedConfig, inputProperties, outputType,
description);
+ this.tableSourceSpec = tableSourceSpec;
this.vectorSearchSpec = vectorSearchSpec;
this.asyncOptions = asyncOptions;
}
@@ -111,7 +156,8 @@ public class StreamExecVectorSearchTableFunction extends
ExecNodeBase<RowData>
Transformation<RowData> inputTransformation =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
// 2. extract search function
- TableSourceTable searchTable =
vectorSearchTableSourceSpec.getSearchTable();
+ TableSourceTable searchTable =
+ tableSourceSpec.getSearchTable(planner.getFlinkContext(),
planner.getTypeFactory());
boolean isAsyncEnabled = asyncOptions != null;
UserDefinedFunction vectorSearchFunction =
findVectorSearchFunction(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index f860d7104cc..cd24cc9c82b 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -179,6 +179,7 @@ public final class ExecNodeMetadataUtil {
add(StreamExecPythonOverAggregate.class);
add(StreamExecMLPredictTableFunction.class);
add(StreamExecDeltaJoin.class);
+ add(StreamExecVectorSearchTableFunction.class);
// Batch execution mode
add(BatchExecSink.class);
add(BatchExecTableSourceScan.class);
@@ -222,7 +223,6 @@ public final class ExecNodeMetadataUtil {
add(StreamExecGroupTableAggregate.class);
add(StreamExecPythonGroupTableAggregate.class);
add(StreamExecMultipleInput.class);
- add(StreamExecVectorSearchTableFunction.class);
}
};
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index cebde11ac66..79387ab9285 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -2215,6 +2215,30 @@ public final class TestValuesTableFactory
}
}
+ /** A mocked {@link VectorSearchTableSource} for validation test. */
+ public static class MockedVectorSearchTableSource implements
VectorSearchTableSource {
+ @Override
+ public VectorSearchRuntimeProvider
getSearchRuntimeProvider(VectorSearchContext context) {
+ return VectorSearchFunctionProvider.of(
+ new VectorSearchFunction() {
+ @Override
+ public Collection<RowData> vectorSearch(int topK,
RowData queryData) {
+ return Collections.emptyList();
+ }
+ });
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "MockedVectorSearchSource";
+ }
+ }
+
/**
* Values {@link ScanTableSource} which collects the registered {@link
RowData} directly, sleeps
* {@link #sleepTimeMillis} every {@link #sleepAfterElements} elements.
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java
new file mode 100644
index 00000000000..8604882b7de
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.plan.abilities.source.LimitPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import
org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchTableSourceSpec;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
+
+/** Tests for {@link VectorSearchTableSourceSpec} serialization and
deserialization. */
+@Execution(CONCURRENT)
+public class VectorSearchTableSourceSpecSerdeTest {
+
+ private static final FlinkTypeFactory FACTORY =
+ new FlinkTypeFactory(
+ TemporalTableSourceSpecSerdeTest.class.getClassLoader(),
+ FlinkTypeSystem.INSTANCE);
+
+ private static final FlinkContext FLINK_CONTEXT =
+ JsonSerdeTestUtil.configuredSerdeContext().getFlinkContext();
+
+ public static Stream<VectorSearchTableSourceSpec>
testVectorSearchTableSourceSpecSerde() {
+
+ Map<String, String> options1 = new HashMap<>();
+ options1.put("connector", "filesystem");
+ options1.put("format", "testcsv");
+ options1.put("path", "/tmp");
+
+ final ResolvedSchema resolvedSchema1 =
+ new ResolvedSchema(
+ Collections.singletonList(Column.physical("a",
DataTypes.BIGINT())),
+ Collections.emptyList(),
+ null,
+ Collections.emptyList());
+
+ final CatalogTable catalogTable1 =
+ CatalogTable.newBuilder()
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema1).build())
+ .options(options1)
+ .build();
+
+ ResolvedCatalogTable resolvedCatalogTable =
+ new ResolvedCatalogTable(catalogTable1, resolvedSchema1);
+
+ RelDataType relDataType1 = FACTORY.createSqlType(SqlTypeName.BIGINT);
+
+ TableSourceTable tableSourceTable1 =
+ new TableSourceTable(
+ null,
+ relDataType1,
+ FlinkStatistic.UNKNOWN(),
+ new
TestValuesTableFactory.MockedVectorSearchTableSource(),
+ true,
+ ContextResolvedTable.temporary(
+ ObjectIdentifier.of("default_catalog",
"default_db", "MyTable"),
+ resolvedCatalogTable),
+ FLINK_CONTEXT,
+ FACTORY,
+ new SourceAbilitySpec[] {new LimitPushDownSpec(100)});
+ return Stream.of(new VectorSearchTableSourceSpec(tableSourceTable1));
+ }
+
+ @ParameterizedTest
+ @MethodSource("testVectorSearchTableSourceSpecSerde")
+ public void testTemporalTableSourceSpecSerde(VectorSearchTableSourceSpec
spec)
+ throws IOException {
+ CatalogManager catalogManager =
CatalogManagerMocks.createEmptyCatalogManager();
+ catalogManager.createTemporaryTable(
+
spec.getTableSourceSpec().getContextResolvedTable().getResolvedTable(),
+
spec.getTableSourceSpec().getContextResolvedTable().getIdentifier(),
+ false);
+
+ SerdeContext serdeCtx =
+ JsonSerdeTestUtil.configuredSerdeContext(catalogManager,
TableConfig.getDefault());
+
+ String json = JsonSerdeTestUtil.toJson(serdeCtx, spec);
+ VectorSearchTableSourceSpec actual =
+ JsonSerdeTestUtil.toObject(serdeCtx, json,
VectorSearchTableSourceSpec.class);
+ assertThat(actual.getTableSourceSpec().getContextResolvedTable())
+
.isEqualTo(spec.getTableSourceSpec().getContextResolvedTable());
+ assertThat(actual.getTableSourceSpec().getSourceAbilities())
+ .isEqualTo(spec.getTableSourceSpec().getSourceAbilities());
+ assertThat(actual.getOutputType()).isEqualTo(spec.getOutputType());
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchRestoreTest.java
new file mode 100644
index 00000000000..937d27baecc
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchRestoreTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.flink.table.planner.plan.nodes.exec.stream.VectorSearchTestPrograms.ASYNC_VECTOR_SEARCH;
+import static
org.apache.flink.table.planner.plan.nodes.exec.stream.VectorSearchTestPrograms.SYNC_VECTOR_SEARCH;
+
+/** Restore tests for {@link StreamExecVectorSearchTableFunction}. */
+public class VectorSearchRestoreTest extends RestoreTestBase {
+
+ public VectorSearchRestoreTest() {
+ super(StreamExecVectorSearchTableFunction.class);
+ }
+
+ @Override
+ public List<TableTestProgram> programs() {
+ return Arrays.asList(SYNC_VECTOR_SEARCH, ASYNC_VECTOR_SEARCH);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchTestPrograms.java
new file mode 100644
index 00000000000..34265722782
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchTestPrograms.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link
StreamExecMLPredictTableFunction}. */
+public class VectorSearchTestPrograms {
+
+ static final String[] SOURCE_SCHEMA =
+ new String[] {"id BIGINT", "content STRING", "vector
ARRAY<FLOAT>"};
+
+ static final Row[] INPUT_BEFORE_DATA =
+ new Row[] {Row.of(1L, "Spark", new Float[] {5f, 12f, 13f})};
+
+ static final Row[] INPUT_AFTER_DATA =
+ new Row[] {Row.of(2L, "Flink", new Float[] {-5f, -12f, -13f})};
+
+ static final SourceTestStep SOURCE_TABLE =
+ SourceTestStep.newBuilder("src_t")
+ .addSchema(SOURCE_SCHEMA)
+ .producedBeforeRestore(INPUT_BEFORE_DATA)
+ .producedAfterRestore(INPUT_AFTER_DATA)
+ .build();
+
+ //
-------------------------------------------------------------------------------------------
+
+ static final String[] VECTOR_TABLE_SCHEMA =
+ new String[] {"label STRING", "vector ARRAY<FLOAT>"};
+
+ static final Row[] VECTOR_TABLE_DATA =
+ new Row[] {
+ Row.of("Batch", new Float[] {5f, 12f, 13f}),
+ Row.of("Streaming", new Float[] {-5f, -12f, -13f}),
+ Row.of("Big Data", new Float[] {1f, 1f, 0f})
+ };
+
+ static final SourceTestStep ASYNC_VECTOR_TABLE =
+ SourceTestStep.newBuilder("async_vector_table")
+ .addSchema(VECTOR_TABLE_SCHEMA)
+ .addOption("async", "true")
+ .addOption("enable-vector-search", "true")
+ .producedBeforeRestore(VECTOR_TABLE_DATA)
+ .producedAfterRestore(VECTOR_TABLE_DATA)
+ .build();
+
+ static final SourceTestStep SYNC_VECTOR_TABLE =
+ SourceTestStep.newBuilder("sync_vector_table")
+ .addSchema(VECTOR_TABLE_SCHEMA)
+ .addOption("enable-vector-search", "true")
+ .producedBeforeRestore(VECTOR_TABLE_DATA)
+ .producedAfterRestore(VECTOR_TABLE_DATA)
+ .build();
+
+ //
-------------------------------------------------------------------------------------------
+
+ static final String[] SINK_SCHEMA =
+ new String[] {"id BIGINT", "content STRING", "label STRING"};
+
+ static final SinkTestStep SINK_TABLE =
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(SINK_SCHEMA)
+ .consumedBeforeRestore("+I[1, Spark, Batch]", "+I[1,
Spark, Big Data]")
+ .consumedAfterRestore("+I[2, Flink, Streaming]", "+I[2,
Flink, Big Data]")
+ .build();
+
+ //
-------------------------------------------------------------------------------------------
+
+ public static final TableTestProgram SYNC_VECTOR_SEARCH =
+ TableTestProgram.of("sync-vector-search", "vector search in sync
mode.")
+ .setupTableSource(SOURCE_TABLE)
+ .setupTableSource(SYNC_VECTOR_TABLE)
+ .setupTableSink(SINK_TABLE)
+ .runSql(
+ "INSERT INTO sink_t SELECT id, content, label FROM
src_t, LATERAL TABLE(\n"
+ + "VECTOR_SEARCH(TABLE sync_vector_table,
DESCRIPTOR(vector), src_t.vector, 2))")
+ .build();
+
+ public static final TableTestProgram ASYNC_VECTOR_SEARCH =
+ TableTestProgram.of("async-vector-search", "vector search in async
mode.")
+ .setupTableSource(SOURCE_TABLE)
+ .setupTableSource(ASYNC_VECTOR_TABLE)
+ .setupTableSink(SINK_TABLE)
+ .runSql(
+ "INSERT INTO sink_t SELECT id, content, label FROM
src_t, LATERAL TABLE(\n"
+ + "VECTOR_SEARCH(TABLE async_vector_table,
DESCRIPTOR(vector), src_t.vector, 2))")
+ .build();
+}
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
new file mode 100644
index 00000000000..31c0de74713
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/async-vector-search/plan/async-vector-search.json
@@ -0,0 +1,175 @@
+{
+ "flinkVersion" : "2.2",
+ "nodes" : [ {
+ "id" : 5,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`src_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "content",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "vector",
+ "dataType" : "ARRAY<FLOAT>"
+ } ]
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector`
ARRAY<FLOAT>>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, src_t]], fields=[id, content, vector])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-vector-search-table-function_1",
+ "configuration" : {
+ "table.exec.async-vector-search.max-concurrent-operations" : "10",
+ "table.exec.async-vector-search.output-mode" : "ORDERED",
+ "table.exec.async-vector-search.timeout" : "3 min"
+ },
+ "tableSourceSpec" : {
+ "vectorSearchTableSource" : {
+ "table" : {
+ "identifier" :
"`default_catalog`.`default_database`.`async_vector_table`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "label",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "vector",
+ "dataType" : "ARRAY<FLOAT>"
+ } ]
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`label` VARCHAR(2147483647), `vector` ARRAY<FLOAT>>
NOT NULL"
+ },
+ "vectorSearchSpec" : {
+ "joinType" : "INNER",
+ "searchColumns" : {
+ "1" : {
+ "type" : "FieldRef",
+ "index" : 2
+ }
+ },
+ "topK" : {
+ "type" : "Constant",
+ "sourceType" : "INT NOT NULL",
+ "literal" : {
+ "kind" : "LITERAL",
+ "value" : 2,
+ "type" : "INT NOT NULL"
+ }
+ },
+ "outputType" : "ROW<`label` VARCHAR(2147483647), `vector` ARRAY<FLOAT>,
`score` DOUBLE NOT NULL>"
+ },
+ "asyncOptions" : {
+ "capacity " : 10,
+ "timeout" : 180000,
+ "output-mode" : "ORDERED"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector`
ARRAY<FLOAT>, `label` VARCHAR(2147483647), `vector0` ARRAY<FLOAT>, `score`
DOUBLE NOT NULL>",
+ "description" :
"VectorSearchTableFunction(table=[default_catalog.default_database.async_vector_table],
joinType=[InnerJoin], columnToSearch=[vector], columnToQuery=[vector],
topK=[2], select=[id, content, vector, label, vector, score])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `label`
VARCHAR(2147483647)>",
+ "description" : "Calc(select=[id, content, label])"
+ }, {
+ "id" : 8,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "content",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "label",
+ "dataType" : "VARCHAR(2147483647)"
+ } ]
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `label`
VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[id, content, label])"
+ } ],
+ "edges" : [ {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/async-vector-search/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/async-vector-search/savepoint/_metadata
new file mode 100644
index 00000000000..d4ef91b4218
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/async-vector-search/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
new file mode 100644
index 00000000000..82d63664594
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/sync-vector-search/plan/sync-vector-search.json
@@ -0,0 +1,171 @@
+{
+ "flinkVersion" : "2.2",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`src_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "content",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "vector",
+ "dataType" : "ARRAY<FLOAT>"
+ } ]
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector`
ARRAY<FLOAT>>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, src_t]], fields=[id, content, vector])"
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-vector-search-table-function_1",
+ "configuration" : {
+ "table.exec.async-vector-search.max-concurrent-operations" : "10",
+ "table.exec.async-vector-search.output-mode" : "ORDERED",
+ "table.exec.async-vector-search.timeout" : "3 min"
+ },
+ "tableSourceSpec" : {
+ "vectorSearchTableSource" : {
+ "table" : {
+ "identifier" :
"`default_catalog`.`default_database`.`sync_vector_table`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "label",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "vector",
+ "dataType" : "ARRAY<FLOAT>"
+ } ]
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`label` VARCHAR(2147483647), `vector` ARRAY<FLOAT>>
NOT NULL"
+ },
+ "vectorSearchSpec" : {
+ "joinType" : "INNER",
+ "searchColumns" : {
+ "1" : {
+ "type" : "FieldRef",
+ "index" : 2
+ }
+ },
+ "topK" : {
+ "type" : "Constant",
+ "sourceType" : "INT NOT NULL",
+ "literal" : {
+ "kind" : "LITERAL",
+ "value" : 2,
+ "type" : "INT NOT NULL"
+ }
+ },
+ "outputType" : "ROW<`label` VARCHAR(2147483647), `vector` ARRAY<FLOAT>,
`score` DOUBLE NOT NULL>"
+ },
+ "asyncOptions" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector`
ARRAY<FLOAT>, `label` VARCHAR(2147483647), `vector0` ARRAY<FLOAT>, `score`
DOUBLE NOT NULL>",
+ "description" :
"VectorSearchTableFunction(table=[default_catalog.default_database.sync_vector_table],
joinType=[InnerJoin], columnToSearch=[vector], columnToQuery=[vector],
topK=[2], select=[id, content, vector, label, vector, score])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `label`
VARCHAR(2147483647)>",
+ "description" : "Calc(select=[id, content, label])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "content",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "label",
+ "dataType" : "VARCHAR(2147483647)"
+ } ]
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `label`
VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[id, content, label])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/sync-vector-search/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/sync-vector-search/savepoint/_metadata
new file mode 100644
index 00000000000..7e661e5387e
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/sync-vector-search/savepoint/_metadata
differ