This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 52155584e17 [FLINK-38528][table] Introduce async vector search 
operator (#27126)
52155584e17 is described below

commit 52155584e17a33ee6ea0de3eb97dc4568d9c8173
Author: Shengkai <[email protected]>
AuthorDate: Wed Oct 22 17:14:42 2025 +0800

    [FLINK-38528][table] Introduce async vector search operator (#27126)
---
 .../StreamExecVectorSearchTableFunction.java       |  70 ++++++-
 .../codegen/VectorSearchCodeGenerator.scala        |  28 +++
 .../factories/TestValuesRuntimeFunctions.java      |  50 +++++
 .../planner/factories/TestValuesTableFactory.java  |  91 +++++++-
 .../stream/table/AsyncVectorSearchITCase.java      | 232 +++++++++++++++++++++
 .../operators/search/AsyncVectorSearchRunner.java  | 147 +++++++++++++
 6 files changed, 606 insertions(+), 12 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
index 9dcb90b2473..c0072e67f9e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
@@ -23,19 +23,23 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.connector.source.VectorSearchTableSource;
 import 
org.apache.flink.table.connector.source.search.AsyncVectorSearchFunctionProvider;
 import 
org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncVectorSearchFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.functions.VectorSearchFunction;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.FunctionCallCodeGenerator;
 import org.apache.flink.table.planner.codegen.VectorSearchCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
@@ -55,9 +59,11 @@ import org.apache.flink.table.planner.utils.ShortcutUtils;
 import org.apache.flink.table.runtime.collector.ListenableCollector;
 import org.apache.flink.table.runtime.generated.GeneratedCollector;
 import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.operators.search.AsyncVectorSearchRunner;
 import org.apache.flink.table.runtime.operators.search.VectorSearchRunner;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -116,17 +122,27 @@ public class StreamExecVectorSearchTableFunction extends 
ExecNodeBase<RowData>
         // 3. build the operator
         RowType inputType = (RowType) inputEdge.getOutputType();
         RowType outputType = (RowType) getOutputType();
+        DataTypeFactory dataTypeFactory =
+                ShortcutUtils.unwrapContext(planner.getFlinkContext())
+                        .getCatalogManager()
+                        .getDataTypeFactory();
         StreamOperatorFactory<RowData> operatorFactory =
                 isAsyncEnabled
-                        ? createAsyncVectorSearchOperator()
+                        ? createAsyncVectorSearchOperator(
+                                searchTable,
+                                config,
+                                planner.getFlinkContext().getClassLoader(),
+                                (AsyncVectorSearchFunction) 
vectorSearchFunction,
+                                dataTypeFactory,
+                                inputType,
+                                vectorSearchSpec.getOutputType(),
+                                outputType)
                         : createSyncVectorSearchOperator(
                                 searchTable,
                                 config,
                                 planner.getFlinkContext().getClassLoader(),
                                 (VectorSearchFunction) vectorSearchFunction,
-                                
ShortcutUtils.unwrapContext(planner.getFlinkContext())
-                                        .getCatalogManager()
-                                        .getDataTypeFactory(),
+                                dataTypeFactory,
                                 inputType,
                                 vectorSearchSpec.getOutputType(),
                                 outputType);
@@ -225,7 +241,49 @@ public class StreamExecVectorSearchTableFunction extends 
ExecNodeBase<RowData>
                 searchOutputType.getFieldCount());
     }
 
-    private SimpleOperatorFactory<RowData> createAsyncVectorSearchOperator() {
-        throw new UnsupportedOperationException("Async vector search is not 
supported yet.");
+    @SuppressWarnings("unchecked")
+    private StreamOperatorFactory<RowData> createAsyncVectorSearchOperator(
+            RelOptTable searchTable,
+            ExecNodeConfig config,
+            ClassLoader jobClassLoader,
+            AsyncVectorSearchFunction vectorSearchFunction,
+            DataTypeFactory dataTypeFactory,
+            RowType inputType,
+            RowType searchOutputType,
+            RowType outputType) {
+        ArrayList<FunctionCallUtil.FunctionParam> parameters =
+                new ArrayList<>(1 + 
vectorSearchSpec.getSearchColumns().size());
+        parameters.add(vectorSearchSpec.getTopK());
+        parameters.addAll(vectorSearchSpec.getSearchColumns().values());
+
+        
FunctionCallCodeGenerator.GeneratedTableFunctionWithDataType<AsyncFunction<RowData,
 Object>>
+                generatedFetcher =
+                        
VectorSearchCodeGenerator.generateAsyncVectorSearchFunction(
+                                config,
+                                jobClassLoader,
+                                dataTypeFactory,
+                                inputType,
+                                searchOutputType,
+                                outputType,
+                                parameters,
+                                vectorSearchFunction,
+                                ((TableSourceTable) searchTable)
+                                        .contextResolvedTable()
+                                        .getIdentifier()
+                                        .asSummaryString());
+
+        boolean isLeftOuterJoin = vectorSearchSpec.getJoinType() == 
JoinRelType.LEFT;
+
+        Preconditions.checkNotNull(asyncOptions, "Async Options can not be 
null.");
+
+        return new AsyncWaitOperatorFactory<>(
+                new AsyncVectorSearchRunner(
+                        (GeneratedFunction) generatedFetcher.tableFunc(),
+                        isLeftOuterJoin,
+                        asyncOptions.asyncBufferCapacity,
+                        searchOutputType.getFieldCount()),
+                asyncOptions.asyncTimeout,
+                asyncOptions.asyncBufferCapacity,
+                asyncOptions.asyncOutputMode);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/VectorSearchCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/VectorSearchCodeGenerator.scala
index 1b533b366f3..87399303699 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/VectorSearchCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/VectorSearchCodeGenerator.scala
@@ -19,9 +19,11 @@ package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.configuration.ReadableConfig
+import org.apache.flink.streaming.api.functions.async.AsyncFunction
 import org.apache.flink.table.catalog.DataTypeFactory
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.functions._
+import 
org.apache.flink.table.planner.codegen.FunctionCallCodeGenerator.GeneratedTableFunctionWithDataType
 import org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil
 import org.apache.flink.table.planner.functions.inference.FunctionCallContext
 import org.apache.flink.table.planner.plan.utils.FunctionCallUtil.FunctionParam
@@ -68,6 +70,32 @@ object VectorSearchCodeGenerator {
       .tableFunc
   }
 
+  /** Generates a async vector search function ([[AsyncTableFunction]]) */
+  def generateAsyncVectorSearchFunction(
+      tableConfig: ReadableConfig,
+      classLoader: ClassLoader,
+      dataTypeFactory: DataTypeFactory,
+      inputType: LogicalType,
+      searchOutputType: LogicalType,
+      outputType: LogicalType,
+      searchColumns: util.List[FunctionParam],
+      asyncVectorSearchFunction: AsyncTableFunction[_],
+      functionName: String): 
GeneratedTableFunctionWithDataType[AsyncFunction[RowData, AnyRef]] = {
+    FunctionCallCodeGenerator.generateAsyncFunctionCall(
+      tableConfig,
+      classLoader,
+      dataTypeFactory,
+      inputType,
+      searchOutputType,
+      outputType,
+      searchColumns,
+      asyncVectorSearchFunction,
+      generateCallWithDataType(functionName, searchOutputType),
+      functionName,
+      "AsyncVectorSearchFunction"
+    )
+  }
+
   private def generateCallWithDataType(
       functionName: String,
       searchOutputType: LogicalType
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index 7f69919e3ef..df439bd2c9e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -54,6 +54,7 @@ import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.conversion.RowRowConverter;
 import org.apache.flink.table.data.utils.JoinedRowData;
 import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.AsyncVectorSearchFunction;
 import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.LookupFunction;
 import org.apache.flink.table.functions.VectorSearchFunction;
@@ -74,6 +75,8 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.clock.RelativeClock;
 import org.apache.flink.util.clock.SystemClock;
 
+import javax.annotation.Nullable;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -1171,4 +1174,51 @@ public final class TestValuesRuntimeFunctions {
             return sum;
         }
     }
+
+    public static class TestValueAsyncVectorSearchFunction extends 
AsyncVectorSearchFunction {
+
+        private final TestValueVectorSearchFunction impl;
+        private final @Nullable Integer latency;
+        private transient ExecutorService executors;
+        private transient Random random;
+
+        public TestValueAsyncVectorSearchFunction(
+                List<Row> data,
+                int[] searchIndices,
+                DataType physicalRowType,
+                @Nullable Integer latency) {
+            this.impl = new TestValueVectorSearchFunction(data, searchIndices, 
physicalRowType);
+            this.latency = latency;
+        }
+
+        @Override
+        public void open(FunctionContext context) throws Exception {
+            super.open(context);
+            impl.open(context);
+            executors = Executors.newCachedThreadPool();
+            random = new Random();
+        }
+
+        @Override
+        public CompletableFuture<Collection<RowData>> asyncVectorSearch(
+                int topK, RowData queryData) {
+            return CompletableFuture.supplyAsync(
+                    () -> {
+                        try {
+                            Thread.sleep(latency == null ? 
random.nextInt(1000) : latency);
+                            return impl.vectorSearch(topK, queryData);
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    },
+                    executors);
+        }
+
+        @Override
+        public void close() throws Exception {
+            super.close();
+            impl.close();
+            executors.shutdown();
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index d31ef6531ea..cebde11ac66 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -83,6 +83,7 @@ import 
org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
 import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
 import 
org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
 import 
org.apache.flink.table.connector.source.lookup.cache.trigger.PeriodicCacheReloadTrigger;
+import 
org.apache.flink.table.connector.source.search.AsyncVectorSearchFunctionProvider;
 import 
org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -95,9 +96,11 @@ import 
org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.functions.AsyncLookupFunction;
 import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.AsyncVectorSearchFunction;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.LookupFunction;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.VectorSearchFunction;
 import org.apache.flink.table.legacy.api.TableSchema;
 import org.apache.flink.table.legacy.api.WatermarkSpec;
 import 
org.apache.flink.table.legacy.connector.source.AsyncTableFunctionProvider;
@@ -501,6 +504,14 @@ public final class TestValuesTableFactory
                             "Option to specify the amount of time to sleep 
after processing every N elements. "
                                     + "The default value is 0, which means 
that no sleep is performed");
 
+    public static final ConfigOption<Integer> LATENCY =
+            ConfigOptions.key("latency")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Latency in milliseconds for async vector search 
call for each row. "
+                                    + "If not set, the default is random 
between 0ms and 1000ms.");
+
     /**
      * Parse partition list from Options with the format as
      * "key1:val1,key2:val2;key1:val3,key2:val4".
@@ -654,7 +665,9 @@ public final class TestValuesTableFactory
                         readableMetadata,
                         null,
                         parallelism,
-                        enableAggregatePushDown);
+                        enableAggregatePushDown,
+                        isAsync,
+                        helper.getOptions().get(LATENCY));
             }
 
             if (disableLookup) {
@@ -888,7 +901,8 @@ public final class TestValuesTableFactory
                         FULL_CACHE_PERIODIC_RELOAD_INTERVAL,
                         FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE,
                         FULL_CACHE_TIMED_RELOAD_ISO_TIME,
-                        FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS));
+                        FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS,
+                        LATENCY));
     }
 
     private static int validateAndExtractRowtimeIndex(
@@ -1054,7 +1068,7 @@ public final class TestValuesTableFactory
         private @Nullable int[] groupingSet;
         private List<AggregateExpression> aggregateExpressions;
         private List<String> acceptedPartitionFilterFields;
-        private final Integer parallelism;
+        protected final Integer parallelism;
 
         private TestValuesScanTableSourceWithoutProjectionPushDown(
                 DataType producedDataType,
@@ -2247,6 +2261,9 @@ public final class TestValuesTableFactory
             extends TestValuesScanTableSourceWithoutProjectionPushDown
             implements VectorSearchTableSource {
 
+        private final boolean isAsync;
+        @Nullable private final Integer latency;
+
         private TestValuesVectorSearchTableSourceWithoutProjectionPushDown(
                 DataType producedDataType,
                 ChangelogMode changelogMode,
@@ -2266,7 +2283,9 @@ public final class TestValuesTableFactory
                 Map<String, DataType> readableMetadata,
                 @Nullable int[] projectedMetadataFields,
                 @Nullable Integer parallelism,
-                boolean enableAggregatePushDown) {
+                boolean enableAggregatePushDown,
+                boolean isAsync,
+                @Nullable Integer latency) {
             super(
                     producedDataType,
                     changelogMode,
@@ -2287,6 +2306,8 @@ public final class TestValuesTableFactory
                     projectedMetadataFields,
                     parallelism,
                     enableAggregatePushDown);
+            this.isAsync = isAsync;
+            this.latency = latency;
         }
 
         @Override
@@ -2295,9 +2316,67 @@ public final class TestValuesTableFactory
                     Arrays.stream(context.getSearchColumns()).mapToInt(k -> 
k[0]).toArray();
             Collection<Row> rows =
                     data.getOrDefault(Collections.emptyMap(), 
Collections.emptyList());
-            return VectorSearchFunctionProvider.of(
+            TestValuesRuntimeFunctions.TestValueVectorSearchFunction 
searchFunction =
                     new 
TestValuesRuntimeFunctions.TestValueVectorSearchFunction(
-                            new ArrayList<>(rows), searchColumns, 
producedDataType));
+                            new ArrayList<>(rows), searchColumns, 
producedDataType);
+
+            if (isAsync) {
+                return new VectorFunctionProvider(
+                        new 
TestValuesRuntimeFunctions.TestValueAsyncVectorSearchFunction(
+                                new ArrayList<>(rows), searchColumns, 
producedDataType, latency),
+                        searchFunction);
+            } else {
+                return VectorSearchFunctionProvider.of(searchFunction);
+            }
+        }
+
+        @Override
+        public DynamicTableSource copy() {
+            return new 
TestValuesVectorSearchTableSourceWithoutProjectionPushDown(
+                    producedDataType,
+                    changelogMode,
+                    boundedness,
+                    terminating,
+                    runtimeSource,
+                    failingSource,
+                    data,
+                    nestedProjectionSupported,
+                    projectedPhysicalFields,
+                    filterPredicates,
+                    filterableFields,
+                    dynamicFilteringFields,
+                    numElementToSkip,
+                    limit,
+                    allPartitions,
+                    readableMetadata,
+                    projectedMetadataFields,
+                    parallelism,
+                    enableAggregatePushDown,
+                    isAsync,
+                    latency);
+        }
+
+        private static class VectorFunctionProvider
+                implements AsyncVectorSearchFunctionProvider, 
VectorSearchFunctionProvider {
+
+            private final AsyncVectorSearchFunction asyncFunction;
+            private final VectorSearchFunction syncFunction;
+
+            public VectorFunctionProvider(
+                    AsyncVectorSearchFunction asyncFunction, 
VectorSearchFunction syncFunction) {
+                this.asyncFunction = asyncFunction;
+                this.syncFunction = syncFunction;
+            }
+
+            @Override
+            public AsyncVectorSearchFunction createAsyncVectorSearchFunction() 
{
+                return asyncFunction;
+            }
+
+            @Override
+            public VectorSearchFunction createVectorSearchFunction() {
+                return syncFunction;
+            }
         }
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncVectorSearchITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncVectorSearchITCase.java
new file mode 100644
index 00000000000..1b67730805e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncVectorSearchITCase.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThatList;
+
+/** ITCase for async VECTOR_SEARCH. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class AsyncVectorSearchITCase extends StreamingWithStateTestBase {
+
+    public AsyncVectorSearchITCase(StateBackendMode state) {
+        super(state);
+    }
+
+    private final List<Row> data =
+            Arrays.asList(
+                    Row.of(1L, new Float[] {5f, 12f, 13f}),
+                    Row.of(2L, new Float[] {11f, 60f, 61f}),
+                    Row.of(3L, new Float[] {8f, 15f, 17f}));
+
+    private final List<Row> nullableData =
+            Arrays.asList(Row.of(1L, new Float[] {5f, 12f, 13f}), Row.of(4L, 
null));
+
+    @BeforeEach
+    public void before() {
+        super.before();
+        createTable("src", data);
+        createTable("nullableSrc", nullableData);
+        createTable("vector", data);
+    }
+
+    @TestTemplate
+    void testSimple() {
+        List<Row> actual =
+                CollectionUtil.iteratorToList(
+                        tEnv().executeSql(
+                                        "SELECT * FROM src, LATERAL 
TABLE(VECTOR_SEARCH(TABLE vector, DESCRIPTOR(`vector`), src.vector, 2))")
+                                .collect());
+        assertThatList(actual)
+                .containsExactlyInAnyOrder(
+                        Row.of(
+                                1L,
+                                new Float[] {5.0f, 12.0f, 13.0f},
+                                1L,
+                                new Float[] {5.0f, 12.0f, 13.0f},
+                                1.0),
+                        Row.of(
+                                1L,
+                                new Float[] {5.0f, 12.0f, 13.0f},
+                                3L,
+                                new Float[] {8f, 15f, 17f},
+                                0.9977375565610862),
+                        Row.of(
+                                2L,
+                                new Float[] {11f, 60f, 61f},
+                                2L,
+                                new Float[] {11f, 60f, 61f},
+                                1.0),
+                        Row.of(
+                                2L,
+                                new Float[] {11f, 60f, 61f},
+                                1L,
+                                new Float[] {5.0f, 12.0f, 13.0f},
+                                0.9886506935687265),
+                        Row.of(
+                                3L,
+                                new Float[] {8f, 15f, 17f},
+                                3L,
+                                new Float[] {8f, 15f, 17f},
+                                1.0000000000000002),
+                        Row.of(
+                                3L,
+                                new Float[] {8f, 15f, 17f},
+                                1L,
+                                new Float[] {5.0f, 12.0f, 13.0f},
+                                0.9977375565610862));
+    }
+
+    @TestTemplate
+    void testLeftLateralJoin() {
+        List<Row> actual =
+                CollectionUtil.iteratorToList(
+                        tEnv().executeSql(
+                                        "SELECT * FROM nullableSrc LEFT JOIN 
LATERAL TABLE(VECTOR_SEARCH(TABLE vector, DESCRIPTOR(`vector`), 
nullableSrc.vector, 2)) ON TRUE")
+                                .collect());
+        assertThatList(actual)
+                .containsExactlyInAnyOrder(
+                        Row.of(
+                                1L,
+                                new Float[] {5.0f, 12.0f, 13.0f},
+                                1L,
+                                new Float[] {5.0f, 12.0f, 13.0f},
+                                1.0),
+                        Row.of(
+                                1L,
+                                new Float[] {5.0f, 12.0f, 13.0f},
+                                3L,
+                                new Float[] {8f, 15f, 17f},
+                                0.9977375565610862),
+                        Row.of(4L, null, null, null, null));
+    }
+
+    @TestTemplate
+    void testTimeout() {
+        tEnv().getConfig()
+                .set(
+                        
ExecutionConfigOptions.TABLE_EXEC_ASYNC_VECTOR_SEARCH_TIMEOUT,
+                        Duration.ofMillis(100));
+        assertThatThrownBy(
+                        () ->
+                                CollectionUtil.iteratorToList(
+                                        tEnv().executeSql(
+                                                        "SELECT * FROM 
nullableSrc LEFT JOIN LATERAL TABLE(VECTOR_SEARCH(TABLE vector, 
DESCRIPTOR(`vector`), nullableSrc.vector, 2)) ON TRUE")
+                                                .collect()))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                TimeoutException.class, "Async function call 
has timed out."));
+    }
+
+    @TestTemplate
+    void testVectorSearchWithCalc() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv().executeSql(
+                                                "SELECT * FROM nullableSrc\n "
+                                                        + "LEFT JOIN LATERAL 
TABLE(VECTOR_SEARCH((SELECT id+1, vector FROM vector), DESCRIPTOR(`vector`), 
nullableSrc.vector, 2)) ON TRUE"))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Don't support calc on VECTOR_SEARCH node 
now."));
+    }
+
+    @Parameters(name = "backend = {0}, objectReuse = {1}, asyncOutputMode = 
{2}")
+    public static Collection<Object[]> parameters() {
+        return Arrays.asList(
+                new Object[][] {
+                    {
+                        StreamingWithStateTestBase.HEAP_BACKEND(),
+                        true,
+                        ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED
+                    },
+                    {
+                        StreamingWithStateTestBase.HEAP_BACKEND(),
+                        true,
+                        ExecutionConfigOptions.AsyncOutputMode.ORDERED
+                    },
+                    {
+                        StreamingWithStateTestBase.HEAP_BACKEND(),
+                        false,
+                        ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED
+                    },
+                    {
+                        StreamingWithStateTestBase.HEAP_BACKEND(),
+                        false,
+                        ExecutionConfigOptions.AsyncOutputMode.ORDERED
+                    },
+                    {
+                        StreamingWithStateTestBase.ROCKSDB_BACKEND(),
+                        true,
+                        ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED
+                    },
+                    {
+                        StreamingWithStateTestBase.ROCKSDB_BACKEND(),
+                        true,
+                        ExecutionConfigOptions.AsyncOutputMode.ORDERED
+                    },
+                    {
+                        StreamingWithStateTestBase.ROCKSDB_BACKEND(),
+                        false,
+                        ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED
+                    },
+                    {
+                        StreamingWithStateTestBase.ROCKSDB_BACKEND(),
+                        false,
+                        ExecutionConfigOptions.AsyncOutputMode.ORDERED
+                    }
+                });
+    }
+
+    private void createTable(String tableName, List<Row> data) {
+        String dataId = TestValuesTableFactory.registerData(data);
+        tEnv().executeSql(
+                        String.format(
+                                "CREATE TABLE `%s`(\n"
+                                        + "  id BIGINT,\n"
+                                        + "  vector ARRAY<FLOAT>\n"
+                                        + ") WITH (\n"
+                                        + "  'connector' = 'values',\n"
+                                        + "  'enable-vector-search' = 
'true',\n"
+                                        + "  'data-id' = '%s',\n"
+                                        + "  'async' = 'true',\n"
+                                        + "  'latency' = '1000'"
+                                        + ")",
+                                tableName, dataId));
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/search/AsyncVectorSearchRunner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/search/AsyncVectorSearchRunner.java
new file mode 100644
index 00000000000..b9ff849d708
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/search/AsyncVectorSearchRunner.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.search;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.CollectionSupplier;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.functions.AsyncVectorSearchFunction;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.operators.AbstractAsyncFunctionRunner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Async function runner for {@link AsyncVectorSearchFunction}, which takes 
the generated function,
+ * instantiates it, and then calls its lifecycle methods.
+ */
+public class AsyncVectorSearchRunner extends 
AbstractAsyncFunctionRunner<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final boolean isLeftOuterJoin;
+    private final int asyncBufferCapacity;
+    private final int searchTableFieldCount;
+
+    /**
+     * Buffers {@link ResultFuture} to avoid newInstance cost when processing 
elements every time.
+     * We use {@link BlockingQueue} to make sure the head {@link 
ResultFuture}s are available.
+     */
+    private transient BlockingQueue<JoinedRowResultFuture> resultFutureBuffer;
+
+    public AsyncVectorSearchRunner(
+            GeneratedFunction<AsyncFunction<RowData, RowData>> 
generatedFetcher,
+            boolean isLeftOuterJoin,
+            int asyncBufferCapacity,
+            int searchTableFieldCount) {
+        super(generatedFetcher);
+        this.isLeftOuterJoin = isLeftOuterJoin;
+        this.asyncBufferCapacity = asyncBufferCapacity;
+        this.searchTableFieldCount = searchTableFieldCount;
+    }
+
+    @Override
+    public void open(OpenContext openContext) throws Exception {
+        super.open(openContext);
+        this.resultFutureBuffer = new ArrayBlockingQueue<>(asyncBufferCapacity 
+ 1);
+        // asyncBufferCapacity + 1 as the queue size in order to avoid
+        // blocking on the queue when taking a collector.
+        for (int i = 0; i < asyncBufferCapacity + 1; i++) {
+            resultFutureBuffer.add(
+                    new JoinedRowResultFuture(
+                            resultFutureBuffer, isLeftOuterJoin, 
searchTableFieldCount));
+        }
+    }
+
+    @Override
+    public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) 
throws Exception {
+        JoinedRowResultFuture wrapper = resultFutureBuffer.take();
+        wrapper.reset(input, resultFuture);
+        fetcher.asyncInvoke(input, wrapper);
+    }
+
+    private static final class JoinedRowResultFuture implements 
ResultFuture<RowData> {
+
+        private final BlockingQueue<JoinedRowResultFuture> resultFutureBuffer;
+        private final boolean isLeftOuterJoin;
+        private final GenericRowData nullRow;
+
+        private RowData leftRow;
+        private ResultFuture<RowData> realOutput;
+
+        private JoinedRowResultFuture(
+                BlockingQueue<JoinedRowResultFuture> resultFutureBuffer,
+                boolean isLeftOuterJoin,
+                int searchTableArity) {
+            this.resultFutureBuffer = resultFutureBuffer;
+            this.isLeftOuterJoin = isLeftOuterJoin;
+            this.nullRow = new GenericRowData(searchTableArity);
+        }
+
+        public void reset(RowData leftRow, ResultFuture<RowData> realOutput) {
+            this.leftRow = leftRow;
+            this.realOutput = realOutput;
+        }
+
+        @Override
+        public void complete(Collection<RowData> result) {
+            if (result == null || result.isEmpty()) {
+                if (isLeftOuterJoin) {
+                    RowData outRow = new JoinedRowData(leftRow.getRowKind(), 
leftRow, nullRow);
+                    realOutput.complete(Collections.singleton(outRow));
+                } else {
+                    realOutput.complete(Collections.emptyList());
+                }
+            } else {
+                List<RowData> outRows = new ArrayList<>();
+                for (RowData right : result) {
+                    RowData outRow = new JoinedRowData(leftRow.getRowKind(), 
leftRow, right);
+                    outRows.add(outRow);
+                }
+                realOutput.complete(outRows);
+            }
+            try {
+                // put this collector to the queue to avoid this collector is 
used
+                // again before outRows in the collector is not consumed.
+                resultFutureBuffer.put(this);
+            } catch (InterruptedException e) {
+                completeExceptionally(e);
+            }
+        }
+
+        @Override
+        public void completeExceptionally(Throwable error) {
+            realOutput.completeExceptionally(error);
+        }
+
+        @Override
+        public void complete(CollectionSupplier<RowData> supplier) {
+            throw new UnsupportedOperationException();
+        }
+    }
+}


Reply via email to