This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 58035ea658e [FLINK-38423][table-api] Add VECTOR_SEARCH connector API
(#27037)
58035ea658e is described below
commit 58035ea658e8bb3c0db5128280f958f1e572ad4a
Author: Shengkai <[email protected]>
AuthorDate: Tue Oct 14 14:34:09 2025 +0800
[FLINK-38423][table-api] Add VECTOR_SEARCH connector API (#27037)
---
.../connector/source/VectorSearchTableSource.java | 118 +++++++++++++++++++++
.../search/AsyncVectorSearchFunctionProvider.java | 38 +++++++
.../search/VectorSearchFunctionProvider.java | 37 +++++++
.../table/functions/AsyncVectorSearchFunction.java | 69 ++++++++++++
.../table/functions/VectorSearchFunction.java | 65 ++++++++++++
5 files changed, 327 insertions(+)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/VectorSearchTableSource.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/VectorSearchTableSource.java
new file mode 100644
index 00000000000..0e7e6279bb1
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/VectorSearchTableSource.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import
org.apache.flink.table.connector.source.search.AsyncVectorSearchFunctionProvider;
+import
org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
+import org.apache.flink.types.RowKind;
+
+import java.io.Serializable;
+
+/**
+ * A {@link DynamicTableSource} that searches rows of an external storage
system by one or more
+ * vectors during runtime.
+ *
+ * <p>Compared to {@link ScanTableSource}, the source does not have to read
the entire table and can
+ * lazily fetch individual values from a (possibly continuously changing)
external table when
+ * necessary.
+ *
+ * <p>Note: Compared to {@link ScanTableSource}, a {@link
VectorSearchTableSource} only supports
+ * emitting insert-only changes (see also {@link RowKind}).
+ *
+ * <p>In the last step, the planner will call {@link
#getSearchRuntimeProvider(VectorSearchContext)}
+ * to obtain a provider of runtime implementation. The search fields that are
required to perform a
+ * search are derived from a query by the planner and will be provided in the
given {@link
+ * VectorSearchTableSource.VectorSearchContext#getSearchColumns()}. The values
for those search
+ * fields are passed at runtime.
+ */
+@PublicEvolving
+public interface VectorSearchTableSource extends DynamicTableSource {
+
+ /**
+ * Returns a {@code VectorSearchRuntimeProvider}.
VectorSearchRuntimeProvider is a base
+ * interface that should be extended (is this true) by child interfaces
for specialized vector
+ * searches.
+ *
+ * <p>There exist different interfaces for runtime implementation which is
why {@link
+ * VectorSearchRuntimeProvider} serves as the base interface.
+ *
+ * <p>Independent of the provider interface, a source implementation can
work on either
+ * arbitrary objects or internal data structures (see {@link
org.apache.flink.table.data} for
+ * more information).
+ *
+ * <p>The given {@link VectorSearchContext} offers utilities for the
planner to create runtime
+ * implementation with minimal dependencies to internal data structures.
+ *
+ * @see VectorSearchFunctionProvider
+ * @see AsyncVectorSearchFunctionProvider
+ */
+ VectorSearchRuntimeProvider getSearchRuntimeProvider(VectorSearchContext
context);
+
+ //
--------------------------------------------------------------------------------------------
+ // Helper interfaces
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Context for creating runtime implementation via a {@link
VectorSearchRuntimeProvider}.
+ *
+ * <p>It offers utilities for the planner to create runtime implementation
with minimal
+ * dependencies to internal data structures.
+ *
+ * <p>Methods should be called in {@link
#getSearchRuntimeProvider(VectorSearchContext)}.
+ * Returned instances that are {@link Serializable} can be directly passed
into the runtime
+ * implementation class.
+ */
+ @PublicEvolving
+ interface VectorSearchContext extends DynamicTableSource.Context {
+
+ /**
+ * Returns an array of key index paths that should be used during the
search. The indices
+ * are 0-based and support composite keys within (possibly nested)
structures.
+ *
+ * <p>For example, given a table with data type {@code ROW < i INT, s
STRING, r ROW < i2
+ * INT, s2 STRING > >}, this method would return {@code [[0], [2, 1]]}
when {@code i} and
+ * {@code s2} are used for performing a lookup.
+ *
+ * @return array of key index paths
+ */
+ int[][] getSearchColumns();
+
+ /**
+ * Runtime config provided to the provider. The config can be used by
the planner or vector
+ * search provider at runtime. For example, async options can be used
by planner to choose
+ * async inference. Other config such as http timeout or retry can be
used to configure
+ * search functions.
+ */
+ ReadableConfig runtimeConfig();
+ }
+
+ /**
+ * Provides actual runtime implementation for reading the data.
+ *
+ * <p>There exists different interfaces for runtime implementation which
is why {@link
+ * VectorSearchRuntimeProvider} serves as the base interface.
+ *
+ * @see VectorSearchFunctionProvider
+ * @see AsyncVectorSearchFunctionProvider
+ */
+ @PublicEvolving
+ interface VectorSearchRuntimeProvider {}
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/AsyncVectorSearchFunctionProvider.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/AsyncVectorSearchFunctionProvider.java
new file mode 100644
index 00000000000..9dd7a5083dc
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/AsyncVectorSearchFunctionProvider.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.search;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import org.apache.flink.table.functions.AsyncVectorSearchFunction;
+
+/** A provider for creating {@link AsyncVectorSearchFunction}. */
+@PublicEvolving
+public interface AsyncVectorSearchFunctionProvider
+ extends VectorSearchTableSource.VectorSearchRuntimeProvider {
+
+ /** Helper function for creating a static provider. */
+ static AsyncVectorSearchFunctionProvider of(
+ AsyncVectorSearchFunction asyncVectorSearchFunction) {
+ return () -> asyncVectorSearchFunction;
+ }
+
+ /** Creates an {@link AsyncVectorSearchFunction} instance. */
+ AsyncVectorSearchFunction createAsyncVectorSearchFunction();
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/VectorSearchFunctionProvider.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/VectorSearchFunctionProvider.java
new file mode 100644
index 00000000000..fe50ad585df
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/VectorSearchFunctionProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.search;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import org.apache.flink.table.functions.VectorSearchFunction;
+
+/** A provider for creating {@link VectorSearchFunction}. */
+@PublicEvolving
+public interface VectorSearchFunctionProvider
+ extends VectorSearchTableSource.VectorSearchRuntimeProvider {
+
+ /** Helper function for creating a static provider. */
+ static VectorSearchFunctionProvider of(VectorSearchFunction
searchFunction) {
+ return () -> searchFunction;
+ }
+
+ /** Creates an {@link VectorSearchFunction} instance. */
+ VectorSearchFunction createVectorSearchFunction();
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncVectorSearchFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncVectorSearchFunction.java
new file mode 100644
index 00000000000..fdad0a50ecc
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncVectorSearchFunction.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class of {@link AsyncTableFunction} for asynchronous vector
search.
+ *
+ * <p>The output type of this table function is fixed as {@link RowData}.
+ */
+@PublicEvolving
+public abstract class AsyncVectorSearchFunction extends
AsyncTableFunction<RowData> {
+
+ /**
+ * Asynchronously search result based on input row to find topK matched
rows.
+ *
+ * @param topK - The number of topK matched rows to return.
+ * @param queryData - A {@link RowData} that wraps input for search
function.
+ * @return A collection of all searched results.
+ */
+ public abstract CompletableFuture<Collection<RowData>> asyncVectorSearch(
+ int topK, RowData queryData);
+
+ /** Invokes {@link #asyncVectorSearch} and chains futures. */
+ public void eval(CompletableFuture<Collection<RowData>> future, Object...
args) {
+ int topK = (int) args[0];
+ GenericRowData argsData = new GenericRowData(args.length - 1);
+ for (int i = 1; i < args.length; ++i) {
+ argsData.setField(i, args[i]);
+ }
+ asyncVectorSearch(topK, argsData)
+ .whenComplete(
+ (result, exception) -> {
+ if (exception != null) {
+ future.completeExceptionally(
+ new TableException(
+ String.format(
+ "Failed to execute
asynchronously search with input row %s.",
+ argsData),
+ exception));
+ return;
+ }
+ future.complete(result);
+ });
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/VectorSearchFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/VectorSearchFunction.java
new file mode 100644
index 00000000000..8f71e3f2776
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/VectorSearchFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A wrapper class of {@link TableFunction} for synchronous vector search.
+ *
+ * <p>The output type of this table function is fixed as {@link RowData}.
+ */
+@PublicEvolving
+public abstract class VectorSearchFunction extends TableFunction<RowData> {
+
+ /**
+ * Synchronously search result based on input row to find topK matched
rows.
+ *
+ * @param topK - The number of topK results to return.
+ * @param queryData - A {@link RowData} that wraps input for vector search
function.
+ * @return A collection of predicted results.
+ */
+ public abstract Collection<RowData> vectorSearch(int topK, RowData
queryData)
+ throws IOException;
+
+ /** Invoke {@link #vectorSearch} and handle exceptions. */
+ public final void eval(Object... args) {
+ int topK = (int) args[0];
+ GenericRowData argsData = new GenericRowData(args.length - 1);
+ for (int i = 1; i < args.length; ++i) {
+ argsData.setField(i, args[i]);
+ }
+ try {
+ Collection<RowData> results = vectorSearch(topK, argsData);
+ if (results == null) {
+ return;
+ }
+ results.forEach(this::collect);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(
+ String.format("Failed to execute search with input row
%s.", argsData), e);
+ }
+ }
+}