This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f46693e6a7 [FLINK-38430][table] Support runtime config for 
VECTOR_SEARCH (#27129)
4f46693e6a7 is described below

commit 4f46693e6a7f26a4525389f594f7d84e42683b0b
Author: Shengkai <[email protected]>
AuthorDate: Mon Oct 27 09:20:49 2025 +0800

    [FLINK-38430][table] Support runtime config for VECTOR_SEARCH (#27129)
---
 .../generated/execution_config_configuration.html  |   6 -
 ...vector_search_runtime_config_configuration.html |  36 +++++
 .../ConfigOptionsDocsCompletenessITCase.java       |  25 ++-
 .../table/api/config/ExecutionConfigOptions.java   |   7 -
 .../config/VectorSearchRuntimeConfigOptions.java   |  84 ++++++++++
 .../functions/sql/ml/SqlMLTableFunction.java       |  38 +----
 .../sql/ml/SqlVectorSearchTableFunction.java       | 129 +++++++++++++--
 .../planner/functions/utils/SqlValidatorUtils.java |  43 +++++
 .../plan/nodes/exec/spec/VectorSearchSpec.java     |  13 ++
 .../StreamExecVectorSearchTableFunction.java       |   8 +-
 .../StreamPhysicalMLPredictTableFunctionRule.java  |  26 +--
 .../StreamPhysicalVectorSearchTableFunction.java   |  23 ++-
 ...treamPhysicalVectorSearchTableFunctionRule.java |   7 +
 .../table/planner/plan/utils/FunctionCallUtil.java |  31 ++++
 .../table/planner/plan/utils/VectorSearchUtil.java |  89 +++++++++--
 .../planner/factories/TestValuesTableFactory.java  |  10 ++
 .../nodes/exec/stream/VectorSearchRestoreTest.java |   4 +-
 .../exec/stream/VectorSearchTestPrograms.java      |  18 +++
 .../stream/sql/MLPredictTableFunctionTest.java     |   2 +-
 .../stream/sql/VectorSearchTableFunctionTest.java  |  94 ++++++++++-
 .../stream/table/AsyncVectorSearchITCase.java      |  13 ++
 .../stream/sql/VectorSearchTableFunctionTest.xml   |  38 ++++-
 .../plan/vector-search-with-runtime-config.json    | 174 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 8393 bytes
 24 files changed, 803 insertions(+), 115 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/execution_config_configuration.html 
b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 180c4a48284..e236499de28 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -117,12 +117,6 @@ ORDERED ensures that the operator emits the result in the 
same order as the data
             <td>Duration</td>
             <td>The async timeout for the asynchronous operation to complete, 
including any retries which may occur.</td>
         </tr>
-        <tr>
-            <td><h5>table.exec.async-vector-search.async</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to run an async search function or not. Default to 
false.</td>
-        </tr>
         <tr>
             
<td><h5>table.exec.async-vector-search.max-concurrent-operations</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">10</td>
diff --git 
a/docs/layouts/shortcodes/generated/vector_search_runtime_config_configuration.html
 
b/docs/layouts/shortcodes/generated/vector_search_runtime_config_configuration.html
new file mode 100644
index 00000000000..a7eb6d30fb9
--- /dev/null
+++ 
b/docs/layouts/shortcodes/generated/vector_search_runtime_config_configuration.html
@@ -0,0 +1,36 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>async</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Boolean</td>
+            <td>Value can be 'true' or 'false' to suggest the planner choose 
the corresponding predict function. If the backend search function provider 
does not support the suggested mode, it will throw exception to notify 
users.</td>
+        </tr>
+        <tr>
+            <td><h5>max-concurrent-operations</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>The max number of async i/o operation that the async vector 
search call can trigger.</td>
+        </tr>
+        <tr>
+            <td><h5>output-mode</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td><p>Enum</p></td>
+            <td>Output mode for asynchronous vector search call operations 
which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. If 
set to ALLOW_UNORDERED, will attempt to use {@see 
AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness 
of the result, otherwise ORDERED will be still used.<br /><br />Possible 
values:<ul><li>"ORDERED"</li><li>"ALLOW_UNORDERED"</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>timeout</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Timeout from first invoke to final completion of asynchronous 
vector search call operation, may include multiple retries, and will be reset 
in case of failover.</td>
+        </tr>
+    </tbody>
+</table>
diff --git 
a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java
 
b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java
index 7cfbc5f7384..f0060271a02 100644
--- 
a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java
+++ 
b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java
@@ -33,13 +33,15 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -58,9 +60,16 @@ import static org.assertj.core.api.Fail.fail;
  */
 class ConfigOptionsDocsCompletenessITCase {
 
+    private static final Set<String> DEDUPLICATE_CHECK_EXCLUSIONS =
+            new HashSet<>(
+                    Arrays.asList(
+                            
"org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions",
+                            
"org.apache.flink.table.api.config.VectorSearchRuntimeConfigOptions"));
+
     @Test
     void testCompleteness() throws Exception {
         final Map<String, List<DocumentedOption>> documentedOptions = 
parseDocumentedOptions();
+
         final Map<String, List<ExistingOption>> existingOptions =
                 findExistingOptions(ignored -> true);
 
@@ -76,10 +85,20 @@ class ConfigOptionsDocsCompletenessITCase {
                 .map(
                         (entry) -> {
                             final List<ExistingOption> existingOptions = 
entry.getValue();
-                            final List<ExistingOption> consolidated;
+                            final List<ExistingOption> consolidated = new 
ArrayList<>();
 
                             Optional<ExistingOption> deduped =
                                     existingOptions.stream()
+                                            .filter(
+                                                    option -> {
+                                                        if 
(DEDUPLICATE_CHECK_EXCLUSIONS.contains(
+                                                                
option.containingClass.getName())) {
+                                                            
consolidated.add(option);
+                                                            return false;
+                                                        } else {
+                                                            return true;
+                                                        }
+                                                    })
                                             .reduce(
                                                     (option1, option2) -> {
                                                         if 
(option1.equals(option2)) {
@@ -125,7 +144,7 @@ class ConfigOptionsDocsCompletenessITCase {
                                                             }
                                                         }
                                                     });
-                            consolidated = 
Collections.singletonList(deduped.get());
+                            deduped.ifPresent(consolidated::add);
 
                             return new Tuple2<>(entry.getKey(), consolidated);
                         })
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 5632f1510ef..aa02daa498c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -603,13 +603,6 @@ public class ExecutionConfigOptions {
     // ------------------------------------------------------------------------
     //  Async VECTOR_SEARCH Options
     // ------------------------------------------------------------------------
-    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
-    public static final ConfigOption<Boolean> 
TABLE_EXEC_ASYNC_VECTOR_SEARCH_ASYNC =
-            key("table.exec.async-vector-search.async")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "Whether to run an async search function or not. 
Default to false.");
 
     @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
     public static final ConfigOption<Integer>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/VectorSearchRuntimeConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/VectorSearchRuntimeConfigOptions.java
new file mode 100644
index 00000000000..5a477cd2277
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/VectorSearchRuntimeConfigOptions.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableSet;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds option name definitions for VECTOR_SEARCH runtime config 
based on {@link
+ * ConfigOption}.
+ */
+@PublicEvolving
+public class VectorSearchRuntimeConfigOptions {
+
+    public static final ConfigOption<Boolean> ASYNC =
+            key("async")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Value can be 'true' or 'false' to suggest the 
planner choose the corresponding"
+                                    + " predict function. If the backend 
search function provider does not support the"
+                                    + " suggested mode, it will throw 
exception to notify users.");
+
+    public static final ConfigOption<ExecutionConfigOptions.AsyncOutputMode> 
ASYNC_OUTPUT_MODE =
+            key("output-mode")
+                    .enumType(ExecutionConfigOptions.AsyncOutputMode.class)
+                    .noDefaultValue()
+                    .withDescription(
+                            "Output mode for asynchronous vector search call 
operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by 
default. "
+                                    + "If set to ALLOW_UNORDERED, will attempt 
to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not "
+                                    + "affect the correctness of the result, 
otherwise ORDERED will be still used.");
+
+    public static final ConfigOption<Integer> ASYNC_MAX_CONCURRENT_OPERATIONS =
+            key("max-concurrent-operations")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The max number of async i/o operation that the 
async vector search call can trigger.");
+
+    public static final ConfigOption<Duration> ASYNC_TIMEOUT =
+            key("timeout")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Timeout from first invoke to final completion of 
asynchronous vector search call operation, may include multiple"
+                                    + " retries, and will be reset in case of 
failover.");
+
+    private static final Set<ConfigOption<?>> supportedKeys = new HashSet<>();
+
+    static {
+        supportedKeys.add(ASYNC);
+        supportedKeys.add(ASYNC_OUTPUT_MODE);
+        supportedKeys.add(ASYNC_MAX_CONCURRENT_OPERATIONS);
+        supportedKeys.add(ASYNC_TIMEOUT);
+    }
+
+    public static ImmutableSet<ConfigOption> getSupportedOptions() {
+        return ImmutableSet.copyOf(supportedKeys);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLTableFunction.java
index d88884d8c74..6c1c289d48c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLTableFunction.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.functions.sql.ml;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions;
+import org.apache.flink.table.api.config.VectorSearchRuntimeConfigOptions;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
 import org.apache.flink.types.Either;
@@ -28,8 +28,6 @@ import org.apache.flink.types.Either;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlCharStringLiteral;
-import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlIdentifier;
@@ -45,7 +43,6 @@ import org.apache.calcite.sql.type.SqlReturnTypeInference;
 import org.apache.calcite.sql.validate.SqlNameMatcher;
 import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.sql.validate.SqlValidatorScope;
-import org.apache.calcite.util.NlsString;
 import org.apache.calcite.util.Util;
 
 import java.util.HashMap;
@@ -56,6 +53,7 @@ import java.util.Optional;
 import static 
org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC;
 import static 
org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions.ASYNC_MAX_CONCURRENT_OPERATIONS;
 import static 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType;
+import static 
org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.reduceLiteralToString;
 import static 
org.apache.flink.table.types.logical.LogicalTypeFamily.CHARACTER_STRING;
 
 /**
@@ -219,9 +217,9 @@ public abstract class SqlMLTableFunction extends 
SqlFunction implements SqlTable
         Map<String, String> runtimeConfig = new HashMap<>();
         for (int i = 0; i < operands.size(); i += 2) {
             Either<String, RuntimeException> key =
-                    reduceLiteral(operands.get(i), callBinding.getValidator());
+                    reduceLiteralToString(operands.get(i), 
callBinding.getValidator());
             Either<String, RuntimeException> value =
-                    reduceLiteral(operands.get(i + 1), 
callBinding.getValidator());
+                    reduceLiteralToString(operands.get(i + 1), 
callBinding.getValidator());
 
             if (key.isRight()) {
                 return Optional.of(key.right());
@@ -235,10 +233,10 @@ public abstract class SqlMLTableFunction extends 
SqlFunction implements SqlTable
         return checkConfigValue(runtimeConfig);
     }
 
-    private static Optional<RuntimeException> checkConfigValue(Map<String, 
String> runtimeConfig) {
+    public static Optional<RuntimeException> checkConfigValue(Map<String, 
String> runtimeConfig) {
         Configuration config = Configuration.fromMap(runtimeConfig);
         try {
-            
MLPredictRuntimeConfigOptions.getSupportedOptions().forEach(config::get);
+            
VectorSearchRuntimeConfigOptions.getSupportedOptions().forEach(config::get);
         } catch (Throwable t) {
             return Optional.of(new ValidationException("Failed to parse the 
config.", t));
         }
@@ -260,28 +258,4 @@ public abstract class SqlMLTableFunction extends 
SqlFunction implements SqlTable
 
         return Optional.empty();
     }
-
-    private static Either<String, RuntimeException> reduceLiteral(
-            SqlNode operand, SqlValidator validator) {
-        if (operand instanceof SqlCharStringLiteral) {
-            return Either.Left(
-                    ((SqlCharStringLiteral) 
operand).getValueAs(NlsString.class).getValue());
-        } else if (operand.getKind() == SqlKind.CAST) {
-            // CAST(CAST('v' AS STRING) AS STRING)
-            SqlCall call = (SqlCall) operand;
-            SqlDataTypeSpec dataType = call.operand(1);
-            if 
(!toLogicalType(dataType.deriveType(validator)).is(CHARACTER_STRING)) {
-                return Either.Right(
-                        new ValidationException("Don't support to cast value 
to non-string type."));
-            }
-            return reduceLiteral((call.operand(0)), validator);
-        } else {
-            return Either.Right(
-                    new ValidationException(
-                            String.format(
-                                    "Unsupported expression %s is in runtime 
config at position %s. Currently, "
-                                            + "runtime config should be be a 
MAP of string literals.",
-                                    operand, operand.getParserPosition())));
-        }
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlVectorSearchTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlVectorSearchTableFunction.java
index 5499a076b2e..891c3f793ca 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlVectorSearchTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlVectorSearchTableFunction.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.table.planner.functions.sql.ml;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.VectorSearchRuntimeConfigOptions;
 import org.apache.flink.table.planner.functions.utils.SqlValidatorUtils;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
+import org.apache.flink.types.Either;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -39,6 +42,7 @@ import org.apache.calcite.sql.SqlOperandCountRange;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlOperatorBinding;
 import org.apache.calcite.sql.SqlTableFunction;
+import org.apache.calcite.sql.type.MapSqlType;
 import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.type.SqlOperandCountRanges;
 import org.apache.calcite.sql.type.SqlOperandMetadata;
@@ -50,10 +54,17 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 
+import static 
org.apache.flink.table.api.config.VectorSearchRuntimeConfigOptions.ASYNC;
+import static 
org.apache.flink.table.api.config.VectorSearchRuntimeConfigOptions.ASYNC_MAX_CONCURRENT_OPERATIONS;
 import static 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType;
+import static 
org.apache.flink.table.planner.functions.utils.SqlValidatorUtils.reduceLiteralToString;
+import static 
org.apache.flink.table.types.logical.LogicalTypeFamily.CHARACTER_STRING;
 
 /**
  * {@link SqlVectorSearchTableFunction} implements an operator for search.
@@ -65,6 +76,7 @@ import static 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalT
  *   <li>a descriptor to provide a column name from the input table
  *   <li>a query column from the left table
  *   <li>a literal value for top k
+ *   <li>an optional config map
  * </ol>
  */
 public class SqlVectorSearchTableFunction extends SqlFunction implements 
SqlTableFunction {
@@ -73,6 +85,7 @@ public class SqlVectorSearchTableFunction extends SqlFunction 
implements SqlTabl
     private static final String PARAM_COLUMN_TO_SEARCH = "COLUMN_TO_SEARCH";
     private static final String PARAM_COLUMN_TO_QUERY = "COLUMN_TO_QUERY";
     private static final String PARAM_TOP_K = "TOP_K";
+    private static final String PARAM_CONFIG = "CONFIG";
 
     private static final String OUTPUT_SCORE = "score";
 
@@ -92,7 +105,14 @@ public class SqlVectorSearchTableFunction extends 
SqlFunction implements SqlTabl
             @Override
             public @Nullable RelDataType inferReturnType(SqlOperatorBinding 
opBinding) {
                 final RelDataTypeFactory typeFactory = 
opBinding.getTypeFactory();
-                final RelDataType inputRowType = opBinding.getOperandType(0);
+                RelDataType inputRowType;
+                if (opBinding instanceof SqlCallBinding) {
+                    SqlCallBinding callBinding = (SqlCallBinding) opBinding;
+                    List<SqlNode> operands = callBinding.operands();
+                    inputRowType = 
callBinding.getValidator().getValidatedNodeType(operands.get(0));
+                } else {
+                    inputRowType = opBinding.getOperandType(0);
+                }
 
                 return typeFactory
                         .builder()
@@ -125,7 +145,10 @@ public class SqlVectorSearchTableFunction extends 
SqlFunction implements SqlTabl
                                 PARAM_SEARCH_TABLE,
                                 PARAM_COLUMN_TO_SEARCH,
                                 PARAM_COLUMN_TO_QUERY,
-                                PARAM_TOP_K));
+                                PARAM_TOP_K,
+                                PARAM_CONFIG));
+
+        private static final int OPTIONAL_ARG_IDX = 4;
 
         @Override
         public List<RelDataType> paramTypes(RelDataTypeFactory 
relDataTypeFactory) {
@@ -217,33 +240,113 @@ public class SqlVectorSearchTableFunction extends 
SqlFunction implements SqlTabl
                                                 topK))),
                         throwOnFailure);
             }
-            return true;
+
+            // check config type
+            return SqlValidatorUtils.throwExceptionOrReturnFalse(
+                    checkOptionalConfigOperands(
+                            callBinding,
+                            OPTIONAL_ARG_IDX,
+                            SqlVectorSearchTableFunction::checkConfigValue),
+                    throwOnFailure);
         }
 
         @Override
         public SqlOperandCountRange getOperandCountRange() {
-            return SqlOperandCountRanges.between(4, 4);
+            return SqlOperandCountRanges.between(4, 5);
         }
 
         @Override
         public String getAllowedSignatures(SqlOperator op, String opName) {
             return opName
-                    + "(TABLE search_table, DESCRIPTOR(column_to_search), 
column_to_query, top_k)";
+                    + "(TABLE search_table, DESCRIPTOR(column_to_search), 
column_to_query, top_k, [MAP['key1', 'value1']...])";
         }
 
         @Override
-        public Consistency getConsistency() {
-            return Consistency.NONE;
+        public boolean isOptional(int i) {
+            return i == OPTIONAL_ARG_IDX;
         }
+    }
 
-        @Override
-        public boolean isOptional(int i) {
-            return false;
+    /**
+     * Check optional config parameter. Config parameter is a map that define 
some parameters and
+     * values.
+     *
+     * @param callBinding The call binding
+     * @param configLocation The location of the config parameter
+     * @param checkConfigValue Check value in the config map.
+     */
+    public static Optional<RuntimeException> checkOptionalConfigOperands(
+            SqlCallBinding callBinding,
+            int configLocation,
+            Function<Map<String, String>, Optional<RuntimeException>> 
checkConfigValue) {
+        if (callBinding.getOperandCount() <= configLocation) {
+            return Optional.empty();
         }
 
-        @Override
-        public boolean isFixedParameters() {
-            return true;
+        SqlNode configNode = callBinding.operand(configLocation);
+        if (!configNode.getKind().equals(SqlKind.MAP_VALUE_CONSTRUCTOR)) {
+            return Optional.of(new ValidationException("Config param should be 
a MAP."));
+        }
+
+        RelDataType mapType =
+                callBinding
+                        .getValidator()
+                        
.getValidatedNodeType(callBinding.operand(configLocation));
+
+        assert mapType instanceof MapSqlType;
+
+        LogicalType keyType = toLogicalType(mapType.getKeyType());
+        LogicalType valueType = toLogicalType(mapType.getValueType());
+        if (!keyType.is(CHARACTER_STRING) || !valueType.is(CHARACTER_STRING)) {
+            return Optional.of(
+                    new ValidationException(
+                            String.format(
+                                    "Config param can only be a MAP of string 
literals but node's type is %s at position %s.",
+                                    mapType, 
callBinding.operand(3).getParserPosition())));
         }
+
+        List<SqlNode> operands = ((SqlCall) configNode).getOperandList();
+        Map<String, String> runtimeConfig = new HashMap<>();
+        for (int i = 0; i < operands.size(); i += 2) {
+            Either<String, RuntimeException> key =
+                    reduceLiteralToString(operands.get(i), 
callBinding.getValidator());
+            Either<String, RuntimeException> value =
+                    reduceLiteralToString(operands.get(i + 1), 
callBinding.getValidator());
+
+            if (key.isRight()) {
+                return Optional.of(key.right());
+            } else if (value.isRight()) {
+                return Optional.of(value.right());
+            } else {
+                runtimeConfig.put(key.left(), value.left());
+            }
+        }
+        return checkConfigValue.apply(runtimeConfig);
+    }
+
+    public static Optional<RuntimeException> checkConfigValue(Map<String, 
String> runtimeConfig) {
+        Configuration config = Configuration.fromMap(runtimeConfig);
+        try {
+            
VectorSearchRuntimeConfigOptions.getSupportedOptions().forEach(config::get);
+        } catch (Throwable t) {
+            return Optional.of(new ValidationException("Failed to parse the 
config.", t));
+        }
+
+        // option value check
+        // async options are all optional
+        Boolean async = config.get(ASYNC);
+        if (Boolean.TRUE.equals(async)) {
+            Integer maxConcurrentOperations = 
config.get(ASYNC_MAX_CONCURRENT_OPERATIONS);
+            if (maxConcurrentOperations != null && maxConcurrentOperations <= 
0) {
+                return Optional.of(
+                        new ValidationException(
+                                String.format(
+                                        "Invalid runtime config option '%s'. 
Its value should be positive integer but was %s.",
+                                        ASYNC_MAX_CONCURRENT_OPERATIONS.key(),
+                                        maxConcurrentOperations)));
+            }
+        }
+
+        return Optional.empty();
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/SqlValidatorUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/SqlValidatorUtils.java
index 66b58499e09..8fd5d8fee05 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/SqlValidatorUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/SqlValidatorUtils.java
@@ -18,11 +18,16 @@
 
 package org.apache.flink.table.planner.functions.utils;
 
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.types.Either;
+
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
@@ -34,6 +39,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.sql.validate.SqlNameMatcher;
 import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.util.NlsString;
 import org.apache.calcite.util.Pair;
 
 import java.util.ArrayList;
@@ -43,6 +49,8 @@ import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.calcite.util.Static.RESOURCE;
+import static 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType;
+import static 
org.apache.flink.table.types.logical.LogicalTypeFamily.CHARACTER_STRING;
 
 /** Utility methods related to SQL validation. */
 public class SqlValidatorUtils {
@@ -191,6 +199,41 @@ public class SqlValidatorUtils {
         return result;
     }
 
+    public static Either<String, RuntimeException> reduceLiteralToString(
+            SqlNode operand, SqlValidator validator) {
+        if (operand instanceof SqlCharStringLiteral) {
+            return Either.Left(
+                    ((SqlCharStringLiteral) 
operand).getValueAs(NlsString.class).getValue());
+        } else if (operand.getKind() == SqlKind.CAST) {
+            // CAST(CAST('v' AS STRING) AS STRING)
+            SqlCall call = (SqlCall) operand;
+            SqlDataTypeSpec dataType = call.operand(1);
+            if 
(!toLogicalType(dataType.deriveType(validator)).is(CHARACTER_STRING)) {
+                return Either.Right(
+                        new ValidationException("Don't support to cast value 
to non-string type."));
+            }
+            SqlNode operand0 = call.operand(0);
+            if (operand0 instanceof SqlCharStringLiteral) {
+                return Either.Left(
+                        ((SqlCharStringLiteral) 
operand0).getValueAs(NlsString.class).getValue());
+            } else {
+                return Either.Right(
+                        new ValidationException(
+                                String.format(
+                                        "Unsupported expression %s is in 
runtime config at position %s. Currently, "
+                                                + "runtime config should be be 
a MAP of string literals.",
+                                        operand, 
operand.getParserPosition())));
+            }
+        } else {
+            return Either.Right(
+                    new ValidationException(
+                            String.format(
+                                    "Unsupported expression %s is in runtime 
config at position %s. Currently, "
+                                            + "runtime config should be be a 
MAP of string literals.",
+                                    operand, operand.getParserPosition())));
+        }
+    }
+
     private static SqlNode castTo(SqlNode node, RelDataType type) {
         return SqlStdOperatorTable.CAST.createCall(
                 SqlParserPos.ZERO,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchSpec.java
index c20aad9f3fe..9b73a797f0d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchSpec.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 
 import org.apache.calcite.rel.core.JoinRelType;
 
+import javax.annotation.Nullable;
+
 import java.util.Map;
 
 /** VectorSearchSpec describes how vector search is performed. */
@@ -35,6 +37,7 @@ public class VectorSearchSpec {
     public static final String FIELD_NAME_JOIN_TYPE = "joinType";
     public static final String FIELD_NAME_SEARCH_COLUMNS = "searchColumns";
     public static final String FIELD_NAME_TOP_K = "topK";
+    public static final String FIELD_NAME_RUNTIME_CONFIG = "runtimeConfig";
     public static final String FIELD_NAME_OUTPUT_TYPE = "outputType";
 
     @JsonProperty(FIELD_NAME_JOIN_TYPE)
@@ -47,6 +50,9 @@ public class VectorSearchSpec {
     @JsonProperty(FIELD_NAME_TOP_K)
     private final FunctionParam topK;
 
+    @JsonProperty(FIELD_NAME_RUNTIME_CONFIG)
+    private final @Nullable Map<String, String> runtimeConfig;
+
     @JsonProperty(FIELD_NAME_OUTPUT_TYPE)
     private final RowType outputType;
 
@@ -55,10 +61,12 @@ public class VectorSearchSpec {
             @JsonProperty(FIELD_NAME_JOIN_TYPE) JoinRelType joinRelType,
             @JsonProperty(FIELD_NAME_SEARCH_COLUMNS) Map<Integer, 
FunctionParam> searchColumns,
             @JsonProperty(FIELD_NAME_TOP_K) FunctionParam topK,
+            @Nullable @JsonProperty(FIELD_NAME_RUNTIME_CONFIG) Map<String, 
String> runtimeConfig,
             @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType) {
         this.joinRelType = joinRelType;
         this.searchColumns = searchColumns;
         this.topK = topK;
+        this.runtimeConfig = runtimeConfig;
         this.outputType = outputType;
     }
 
@@ -77,6 +85,11 @@ public class VectorSearchSpec {
         return topK;
     }
 
+    @JsonIgnore
+    public @Nullable Map<String, String> getRuntimeConfig() {
+        return runtimeConfig;
+    }
+
     @JsonIgnore
     public RowType getOutputType() {
         return outputType;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
index a411452c099..f20729e0ebd 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream;
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -78,6 +79,7 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 /** Stream {@link ExecNode} for {@code VECTOR_SEARCH}. */
 @ExecNodeMetadata(
@@ -162,7 +164,11 @@ public class StreamExecVectorSearchTableFunction extends 
ExecNodeBase<RowData>
         UserDefinedFunction vectorSearchFunction =
                 findVectorSearchFunction(
                         VectorSearchUtil.createVectorSearchRuntimeProvider(
-                                searchTable, 
vectorSearchSpec.getSearchColumns().keySet()),
+                                searchTable,
+                                vectorSearchSpec.getSearchColumns().keySet(),
+                                Configuration.fromMap(
+                                        
Optional.ofNullable(vectorSearchSpec.getRuntimeConfig())
+                                                
.orElse(Collections.emptyMap()))),
                         isAsyncEnabled);
         UserDefinedFunctionHelper.prepareInstance(config, 
vectorSearchFunction);
         // 3. build the operator
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunctionRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunctionRule.java
index 0642e623c4e..34b84399d1e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunctionRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunctionRule.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.ml.PredictRuntimeProvider;
 import org.apache.flink.table.planner.calcite.RexModelCall;
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 
@@ -39,14 +40,12 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.MapSqlType;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -159,28 +158,7 @@ public class StreamPhysicalMLPredictTableFunctionRule 
extends ConverterRule {
             throw new ValidationException(CONFIG_ERROR_MESSAGE);
         }
 
-        final List<RexNode> mapOperands = mapConstructorCall.getOperands();
-        final Map<String, String> runtimeConfig = new HashMap<>();
-
-        // Process key-value pairs
-        for (int i = 0; i < mapOperands.size(); i += 2) {
-            final RexNode keyNode = mapOperands.get(i);
-            final RexNode valueNode = mapOperands.get(i + 1);
-
-            // Both key and value should be string literals
-            if (!(keyNode instanceof RexLiteral) || !(valueNode instanceof 
RexLiteral)) {
-                throw new ValidationException(CONFIG_ERROR_MESSAGE);
-            }
-
-            final String key = RexLiteral.stringValue(keyNode);
-            final String value = RexLiteral.stringValue(valueNode);
-
-            if (key == null || value == null) {
-                throw new ValidationException("Config map keys and values 
cannot be null.");
-            }
-
-            runtimeConfig.put(key, value);
-        }
+        final Map<String, String> runtimeConfig = 
FunctionCallUtil.convert(mapConstructorCall);
 
         // Validate the configuration values
         validateRuntimeConfig(runtimeConfig);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalVectorSearchTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalVectorSearchTableFunction.java
index 7a44bd767d4..b12b87a2c59 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalVectorSearchTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalVectorSearchTableFunction.java
@@ -47,7 +47,9 @@ import org.apache.calcite.rex.RexProgram;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /** Stream physical RelNode for vector search table function. */
@@ -95,10 +97,7 @@ public class StreamPhysicalVectorSearchTableFunction extends 
SingleRel
     public RelWriter explainTerms(RelWriter pw) {
         List<String> columnToSearch =
                 vectorSearchSpec.getSearchColumns().keySet().stream()
-                        .map(
-                                calcProgram == null
-                                        ? 
searchTable.getRowType().getFieldNames()::get
-                                        : 
calcProgram.getOutputRowType().getFieldNames()::get)
+                        .map(searchTable.getRowType().getFieldNames()::get)
                         .collect(Collectors.toList());
         List<String> columnToQuery =
                 vectorSearchSpec.getSearchColumns().values().stream()
@@ -128,6 +127,10 @@ public class StreamPhysicalVectorSearchTableFunction 
extends SingleRel
                 .item("columnToSearch", String.join(", ", columnToSearch))
                 .item("columnToQuery", String.join(", ", columnToQuery))
                 .item("topK", topK)
+                .itemIf(
+                        "config",
+                        vectorSearchSpec.getRuntimeConfig(),
+                        vectorSearchSpec.getRuntimeConfig() != null)
                 .item("select", String.join(", ", leftSelect, rightSelect, 
"score"));
     }
 
@@ -151,8 +154,16 @@ public class StreamPhysicalVectorSearchTableFunction 
extends SingleRel
                 sourceSpec,
                 vectorSearchSpec,
                 VectorSearchUtil.isAsyncVectorSearch(
-                                searchTable, 
vectorSearchSpec.getSearchColumns().keySet())
-                        ? VectorSearchUtil.getAsyncOptions(tableConfig, 
getInputChangelogMode())
+                                searchTable,
+                                
Optional.ofNullable(vectorSearchSpec.getRuntimeConfig())
+                                        .orElse(Collections.emptyMap()),
+                                vectorSearchSpec.getSearchColumns().keySet())
+                        ? VectorSearchUtil.getMergedVectorSearchAsyncOptions(
+                                vectorSearchSpec.getRuntimeConfig() == null
+                                        ? Collections.emptyMap()
+                                        : vectorSearchSpec.getRuntimeConfig(),
+                                tableConfig,
+                                getInputChangelogMode())
                         : null,
                 InputProperty.DEFAULT,
                 FlinkTypeFactory.toLogicalRowType(outputRowType),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalVectorSearchTableFunctionRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalVectorSearchTableFunctionRule.java
index 2d168df0441..853dfc0193f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalVectorSearchTableFunctionRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalVectorSearchTableFunctionRule.java
@@ -173,10 +173,17 @@ public class StreamPhysicalVectorSearchTableFunctionRule
         FunctionCallUtil.Constant topKParam =
                 new 
FunctionCallUtil.Constant(FlinkTypeFactory.toLogicalType(topK.getType()), topK);
 
+        // Runtime Config
+        Map<String, String> runtimeConfig =
+                functionCall.getOperands().size() < 5
+                        ? null
+                        : FunctionCallUtil.convert((RexCall) 
functionCall.getOperands().get(4));
+
         return new VectorSearchSpec(
                 joinType,
                 searchColumns,
                 topKParam,
+                runtimeConfig,
                 FlinkTypeFactory.toLogicalRowType(scan.getRowType()));
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FunctionCallUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FunctionCallUtil.java
index aa3108f5684..18091c4ae13 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FunctionCallUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FunctionCallUtil.java
@@ -20,10 +20,12 @@ package org.apache.flink.table.planner.plan.utils;
 
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -33,11 +35,17 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSub
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 
+import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.calcite.sql.SqlKind.MAP_VALUE_CONSTRUCTOR;
+
 /** Common utils for function call, e.g. ML_PREDICT and Lookup Join. */
 public abstract class FunctionCallUtil {
 
@@ -206,6 +214,29 @@ public abstract class FunctionCallUtil {
         return AsyncDataStream.OutputMode.ORDERED;
     }
 
+    public static Map<String, String> convert(RexCall mapConstructor) {
+        Preconditions.checkArgument(
+                mapConstructor.getOperator().getKind() == 
MAP_VALUE_CONSTRUCTOR,
+                "Input must be map constructor.");
+        Map<String, String> reducedConfig = new HashMap<>();
+        Preconditions.checkArgument(
+                mapConstructor.getOperands().size() % 2 == 0,
+                "Map constructor input must be even.");
+        for (int i = 0; i < mapConstructor.getOperands().size(); i += 2) {
+            RexNode keyNode = mapConstructor.getOperands().get(i);
+            RexNode valueNode = mapConstructor.getOperands().get(i + 1);
+            // Both key and value should be string literals
+            if (!(keyNode instanceof RexLiteral) || !(valueNode instanceof 
RexLiteral)) {
+                throw new ValidationException(
+                        "Config parameter should be a MAP data type consisting 
String literals.");
+            }
+            String key = RexLiteral.stringValue(keyNode);
+            String value = RexLiteral.stringValue(valueNode);
+            reducedConfig.put(key, value);
+        }
+        return reducedConfig;
+    }
+
     public static String explainFunctionParam(FunctionParam param, 
List<String> fieldNames) {
         if (param instanceof Constant) {
             return RelExplainUtil.literalToString(((Constant) param).literal);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/VectorSearchUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/VectorSearchUtil.java
index cbb3eced6f2..ae8534ecd01 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/VectorSearchUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/VectorSearchUtil.java
@@ -19,10 +19,12 @@
 package org.apache.flink.table.planner.plan.utils;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.VectorSearchRuntimeConfigOptions;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.VectorSearchTableSource;
 import 
org.apache.flink.table.connector.source.search.AsyncVectorSearchFunctionProvider;
@@ -31,50 +33,103 @@ import 
org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import 
org.apache.flink.table.runtime.connector.source.VectorSearchRuntimeProviderContext;
 
 import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.api.config.VectorSearchRuntimeConfigOptions.ASYNC_MAX_CONCURRENT_OPERATIONS;
+import static 
org.apache.flink.table.api.config.VectorSearchRuntimeConfigOptions.ASYNC_OUTPUT_MODE;
+import static 
org.apache.flink.table.api.config.VectorSearchRuntimeConfigOptions.ASYNC_TIMEOUT;
 
 /** Utils for {@code VECTOR_SEARCH}. */
 public class VectorSearchUtil extends FunctionCallUtil {
 
     public static boolean isAsyncVectorSearch(
-            TableSourceTable searchTable, Collection<Integer> searchColumns) {
+            TableSourceTable searchTable,
+            Map<String, String> runtimeConfig,
+            Collection<Integer> searchColumns) {
+        Configuration queryConf = Configuration.fromMap(runtimeConfig);
+
+        boolean syncFound = false;
+        boolean asyncFound = false;
         VectorSearchTableSource.VectorSearchRuntimeProvider provider =
-                createVectorSearchRuntimeProvider(searchTable, searchColumns);
+                createVectorSearchRuntimeProvider(searchTable, searchColumns, 
queryConf);
         if (provider instanceof AsyncVectorSearchFunctionProvider) {
-            return true;
+            asyncFound = true;
         }
         if (provider instanceof VectorSearchFunctionProvider) {
+            syncFound = true;
+        }
+
+        if (!asyncFound && !syncFound) {
+            throw new TableException(
+                    String.format(
+                            "Can not find valid implementation for search 
function for table %s.",
+                            
searchTable.contextResolvedTable().getIdentifier().asSummaryString()));
+        }
+
+        Optional<Boolean> requiredMode =
+                queryConf.getOptional(VectorSearchRuntimeConfigOptions.ASYNC);
+
+        if (!requiredMode.isPresent()) {
+            return asyncFound;
+        } else if (requiredMode.get()) {
+            if (!asyncFound) {
+                throw new TableException(
+                        String.format(
+                                "Require async mode, but vector search 
provider %s doesn't support async mode.",
+                                provider.getClass().getName()));
+            }
+            return true;
+        } else {
+            if (!syncFound) {
+                throw new TableException(
+                        String.format(
+                                "Require sync mode, but vector search provider 
%s doesn't support sync mode.",
+                                provider.getClass().getName()));
+            }
             return false;
         }
-        throw new TableException(
-                String.format(
-                        "Can not find valid implementation for search function 
for table %s.",
-                        
searchTable.contextResolvedTable().getIdentifier().asSummaryString()));
     }
 
     public static VectorSearchTableSource.VectorSearchRuntimeProvider
             createVectorSearchRuntimeProvider(
-                    TableSourceTable searchTable, Collection<Integer> 
searchColumns) {
+                    TableSourceTable searchTable,
+                    Collection<Integer> searchColumns,
+                    ReadableConfig runtimeConfig) {
         int[][] indices = searchColumns.stream().map(i -> new int[] 
{i}).toArray(int[][]::new);
         VectorSearchTableSource tableSource = (VectorSearchTableSource) 
searchTable.tableSource();
         VectorSearchRuntimeProviderContext providerContext =
-                new VectorSearchRuntimeProviderContext(indices, new 
Configuration());
+                new VectorSearchRuntimeProviderContext(indices, runtimeConfig);
         return tableSource.getSearchRuntimeProvider(providerContext);
     }
 
-    public static AsyncOptions getAsyncOptions(
-            TableConfig config, ChangelogMode inputChangelogMode) {
+    public static AsyncOptions getMergedVectorSearchAsyncOptions(
+            Map<String, String> runtimeConfig,
+            TableConfig config,
+            ChangelogMode inputChangelogMode) {
+        Configuration queryConf = Configuration.fromMap(runtimeConfig);
         int asyncBufferCapacity =
-                config.get(
-                        ExecutionConfigOptions
-                                
.TABLE_EXEC_ASYNC_VECTOR_SEARCH_MAX_CONCURRENT_OPERATIONS);
+                coalesce(
+                        queryConf.get(ASYNC_MAX_CONCURRENT_OPERATIONS),
+                        config.get(
+                                ExecutionConfigOptions
+                                        
.TABLE_EXEC_ASYNC_VECTOR_SEARCH_MAX_CONCURRENT_OPERATIONS));
         long asyncTimeout =
-                
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_VECTOR_SEARCH_TIMEOUT)
+                coalesce(
+                                queryConf.get(ASYNC_TIMEOUT),
+                                config.get(
+                                        ExecutionConfigOptions
+                                                
.TABLE_EXEC_ASYNC_VECTOR_SEARCH_TIMEOUT))
                         .toMillis();
         AsyncDataStream.OutputMode asyncOutputMode =
                 convert(
                         inputChangelogMode,
-                        config.get(
-                                
ExecutionConfigOptions.TABLE_EXEC_ASYNC_VECTOR_SEARCH_OUTPUT_MODE));
+                        coalesce(
+                                queryConf.get(ASYNC_OUTPUT_MODE),
+                                config.get(
+                                        ExecutionConfigOptions
+                                                
.TABLE_EXEC_ASYNC_VECTOR_SEARCH_OUTPUT_MODE)));
+
         return new AsyncOptions(asyncBufferCapacity, asyncTimeout, false, 
asyncOutputMode);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 79387ab9285..9f05cafb49f 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -138,6 +138,7 @@ import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -2336,6 +2337,15 @@ public final class TestValuesTableFactory
 
         @Override
         public VectorSearchRuntimeProvider 
getSearchRuntimeProvider(VectorSearchContext context) {
+            if (context.runtimeConfig()
+                    .getOptional(TestValuesTableFactory.ENABLE_VECTOR_SEARCH)
+                    .isPresent()) {
+                Preconditions.checkArgument(
+                        
context.runtimeConfig().get(TestValuesTableFactory.ENABLE_VECTOR_SEARCH),
+                        String.format(
+                                "Require option %s true.",
+                                
TestValuesTableFactory.ENABLE_VECTOR_SEARCH.key()));
+            }
             int[] searchColumns =
                     Arrays.stream(context.getSearchColumns()).mapToInt(k -> 
k[0]).toArray();
             Collection<Row> rows =
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchRestoreTest.java
index 937d27baecc..ca59ddfca4f 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchRestoreTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.VectorSearchTestPrograms.ASYNC_VECTOR_SEARCH;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.VectorSearchTestPrograms.SYNC_VECTOR_SEARCH;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.stream.VectorSearchTestPrograms.VECTOR_SEARCH_WITH_RUNTIME_CONFIG;
 
 /** Restore tests for {@link StreamExecVectorSearchTableFunction}. */
 public class VectorSearchRestoreTest extends RestoreTestBase {
@@ -36,6 +37,7 @@ public class VectorSearchRestoreTest extends RestoreTestBase {
 
     @Override
     public List<TableTestProgram> programs() {
-        return Arrays.asList(SYNC_VECTOR_SEARCH, ASYNC_VECTOR_SEARCH);
+        return Arrays.asList(
+                SYNC_VECTOR_SEARCH, ASYNC_VECTOR_SEARCH, 
VECTOR_SEARCH_WITH_RUNTIME_CONFIG);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchTestPrograms.java
index 34265722782..d196840f917 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VectorSearchTestPrograms.java
@@ -104,4 +104,22 @@ public class VectorSearchTestPrograms {
                             "INSERT INTO sink_t SELECT id, content, label FROM 
src_t, LATERAL TABLE(\n"
                                     + "VECTOR_SEARCH(TABLE async_vector_table, 
DESCRIPTOR(vector), src_t.vector, 2))")
                     .build();
+
+    public static final TableTestProgram VECTOR_SEARCH_WITH_RUNTIME_CONFIG =
+            TableTestProgram.of(
+                            "vector-search-with-runtime-config",
+                            "VECTOR_SEARCH with runtime config")
+                    .setupTableSource(SOURCE_TABLE)
+                    .setupTableSource(ASYNC_VECTOR_TABLE)
+                    .setupTableSink(SINK_TABLE)
+                    .runSql(
+                            "INSERT INTO sink_t SELECT id, content, label FROM 
src_t, LATERAL TABLE(\n"
+                                    + "VECTOR_SEARCH(\n"
+                                    + "  SEARCH_TABLE => TABLE 
async_vector_table,\n"
+                                    + "  COLUMN_TO_SEARCH => 
DESCRIPTOR(vector),\n"
+                                    + "  COLUMN_TO_QUERY => src_t.vector,\n"
+                                    + "  TOP_K => 2,\n"
+                                    + "  CONFIG => MAP['async', 'false']\n"
+                                    + "))")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
index 0c447806d8f..526b92413a9 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
@@ -314,7 +314,7 @@ public class MLPredictTableFunctionTest extends 
TableTestBase {
                                                 + "FROM TABLE(ML_PREDICT(TABLE 
MyTable, MODEL MyModel, DESCRIPTOR(a, b), MAP['async', 'true', 'capacity', 
CAST(-1 AS STRING)]))"))
                 .hasCauseInstanceOf(ValidationException.class)
                 .hasStackTraceContaining(
-                        "Config parameter of ML_PREDICT function should be a 
MAP data type consisting String literals.");
+                        "Config parameter should be a MAP data type consisting 
String literals.");
 
         assertThatThrownBy(
                         () ->
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.java
index 0021d2e920d..be7a2fadd2f 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.java
@@ -73,7 +73,8 @@ public class VectorSearchTableFunctionTest extends 
TableTestBase {
                                 + "  f BIGINT,\n"
                                 + "  g ARRAY<FLOAT>\n"
                                 + ") with (\n"
-                                + "  'connector' = 'values'\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'enable-vector-search' = 'true'"
                                 + ")");
 
         util.tableEnv()
@@ -84,7 +85,8 @@ public class VectorSearchTableFunctionTest extends 
TableTestBase {
                                 + "  g ARRAY<FLOAT>,\n"
                                 + "  proctime as PROCTIME()\n"
                                 + ") with (\n"
-                                + "  'connector' = 'values'\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'enable-vector-search' = 'true'"
                                 + ")");
 
         util.tableEnv()
@@ -153,6 +155,21 @@ public class VectorSearchTableFunctionTest extends 
TableTestBase {
         util.verifyRelPlan(sql);
     }
 
+    @Test
+    void testNamedArgumentWithRuntimeConfig() {
+        String sql =
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "VECTOR_SEARCH(\n"
+                        + "    COLUMN_TO_QUERY => QueryTable.d,\n"
+                        + "    COLUMN_TO_SEARCH => DESCRIPTOR(`g`),\n"
+                        + "    TOP_K => 10,\n"
+                        + "    CONFIG => MAP['async', 'true', 'timeout', 
'100s'],\n"
+                        + "    SEARCH_TABLE => TABLE VectorTable\n"
+                        + "  )\n"
+                        + ")";
+        util.verifyRelPlan(sql);
+    }
+
     @Test
     void testNameConflicts() {
         util.tableEnv()
@@ -427,6 +444,79 @@ public class VectorSearchTableFunctionTest extends 
TableTestBase {
                 .satisfies(FlinkAssertions.anyCauseMatches("Unknown identifier 
'z'"));
     }
 
+    @Test
+    public void testIllegalRuntimeConfigType() {
+        String sql =
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "VECTOR_SEARCH(\n"
+                        + "    TABLE VectorTable, DESCRIPTOR(`g`), 
QueryTable.d, 10, 10"
+                        + ")\n"
+                        + ")";
+        assertThatThrownBy(() -> util.verifyRelPlan(sql))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                ValidationException.class, "Config param 
should be a MAP."));
+    }
+
+    @Test
+    public void testIllegalConfigValue1() {
+        String sql =
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "VECTOR_SEARCH(\n"
+                        + "    TABLE VectorTable, DESCRIPTOR(`g`), 
QueryTable.d, 10, MAP['async', 'yes']"
+                        + ")\n"
+                        + ")";
+        assertThatThrownBy(() -> util.verifyRelPlan(sql))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Unrecognized option for boolean: yes. 
Expected either true or false(case insensitive)"));
+    }
+
+    @Test
+    public void testIllegalConfigValue2() {
+        String sql =
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "VECTOR_SEARCH(\n"
+                        + "    TABLE VectorTable, DESCRIPTOR(`g`), 
QueryTable.d, 10, MAP['async', 'true', 'max-concurrent-operations', '-1']"
+                        + ")\n"
+                        + ")";
+        assertThatThrownBy(() -> util.verifyRelPlan(sql))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                ValidationException.class,
+                                "Invalid runtime config option 
'max-concurrent-operations'. Its value should be positive integer but was 
-1."));
+    }
+
+    @Test
+    public void testPreferAsync() {
+        String sql =
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "VECTOR_SEARCH(\n"
+                        + "    TABLE VectorTable, DESCRIPTOR(`g`), 
QueryTable.d, 10, MAP['async', 'true']"
+                        + ")\n"
+                        + ")";
+        assertThatThrownBy(() -> util.verifyExecPlan(sql))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                TableException.class, "Require async mode"));
+    }
+
+    @Test
+    public void testUsingRuntimeConfigToAdjustConnectorParameter() {
+        String sql =
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "VECTOR_SEARCH(\n"
+                        + "    TABLE VectorTable, DESCRIPTOR(`g`), 
QueryTable.d, 10, MAP['enable-vector-search', 'false']"
+                        + ")\n"
+                        + ")";
+        assertThatThrownBy(() -> util.verifyExecPlan(sql))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Require option enable-vector-search true."));
+    }
+
     public static class TestArrayUDF extends ScalarFunction {
         public Float[] eval(int i) {
             return new Float[] {(float) i};
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncVectorSearchITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncVectorSearchITCase.java
index 408c26685ba..f6429442931 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncVectorSearchITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncVectorSearchITCase.java
@@ -180,6 +180,19 @@ public class AsyncVectorSearchITCase extends 
StreamingWithStateTestBase {
                                 "Don't support calc on VECTOR_SEARCH node 
now."));
     }
 
+    @TestTemplate
+    void testRuntimeConfig() {
+        assertThatThrownBy(
+                        () ->
+                                CollectionUtil.iteratorToList(
+                                        tEnv().executeSql(
+                                                        "SELECT * FROM 
nullableSrc LEFT JOIN LATERAL TABLE(VECTOR_SEARCH(TABLE vector, 
DESCRIPTOR(`vector`), nullableSrc.vector, 2, MAP['timeout', '100ms'])) ON TRUE")
+                                                .collect()))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                TimeoutException.class, "Async function call 
has timed out."));
+    }
+
     @Parameters(name = "backend = {0}, objectReuse = {1}, asyncOutputMode = 
{2}")
     public static Collection<Object[]> parameters() {
         return Arrays.asList(
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.xml
index 2e2c21785bc..4cae7258fea 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.xml
@@ -115,6 +115,40 @@ Calc(select=[a, b, c, d, rowtime, 
PROCTIME_MATERIALIZE(proctime) AS proctime, e,
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
QueryTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNamedArgumentWithRuntimeConfig">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM QueryTable, LATERAL TABLE(
+VECTOR_SEARCH(
+    COLUMN_TO_QUERY => QueryTable.d,
+    COLUMN_TO_SEARCH => DESCRIPTOR(`g`),
+    TOP_K => 10,
+    CONFIG => MAP['async', 'true', 'timeout', '100s'],
+    SEARCH_TABLE => TABLE VectorTable
+  )
+)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
e=[$6], f=[$7], g=[$8], score=[$9])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{3}])
+   :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+   :  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
QueryTable]])
+   +- LogicalTableFunctionScan(invocation=[VECTOR_SEARCH(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'g'), $cor0.d, 10, MAP(_UTF-16LE'async':VARCHAR(7) 
CHARACTER SET "UTF-16LE", _UTF-16LE'true', _UTF-16LE'timeout':VARCHAR(7) 
CHARACTER SET "UTF-16LE", _UTF-16LE'100s'))], rowType=[RecordType(INTEGER e, 
BIGINT f, FLOAT ARRAY g, DOUBLE score)])
+      +- LogicalProject(e=[$0], f=[$1], g=[$2])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
VectorTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e, f, g, score])
++- 
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTable], 
joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10], 
config=[{async=true, timeout=100s}], select=[a, b, c, d, rowtime, proctime, e, 
f, g, score])
+   +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
+      +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
QueryTable]], fields=[a, b, c, d, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -206,7 +240,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
rowtime=[$4], proctime=[$5], e=[$
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e, f, g, h, score])
-+- 
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTableWithMetadata],
 joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10], 
select=[a, b, c, d, rowtime, proctime, e, f, g, +(e, 1) AS h, score])
++- 
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTableWithMetadata],
 joinType=[InnerJoin], columnToSearch=[f], columnToQuery=[d], topK=[10], 
select=[a, b, c, d, rowtime, proctime, e, f, g, +(e, 1) AS h, score])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
QueryTable]], fields=[a, b, c, d, rowtime])
@@ -268,7 +302,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
rowtime=[$4], proctime=[$5], e=[$
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e, f, g, h, score])
-+- 
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTableWithMetadata],
 joinType=[InnerJoin], columnToSearch=[f], columnToQuery=[d], topK=[10], 
select=[a, b, c, d, rowtime, proctime, e, f, g, +(e, 1) AS h, score])
++- 
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTableWithMetadata],
 joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10], 
select=[a, b, c, d, rowtime, proctime, e, f, g, +(e, 1) AS h, score])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
QueryTable]], fields=[a, b, c, d, rowtime])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json
new file mode 100644
index 00000000000..5d6dd13f9f0
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/vector-search-with-runtime-config/plan/vector-search-with-runtime-config.json
@@ -0,0 +1,174 @@
+{
+  "flinkVersion" : "2.2",
+  "nodes" : [ {
+    "id" : 9,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`src_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "content",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "vector",
+              "dataType" : "ARRAY<FLOAT>"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` 
ARRAY<FLOAT>>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, src_t]], fields=[id, content, vector])"
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-vector-search-table-function_1",
+    "configuration" : {
+      "table.exec.async-vector-search.max-concurrent-operations" : "10",
+      "table.exec.async-vector-search.output-mode" : "ORDERED",
+      "table.exec.async-vector-search.timeout" : "3 min"
+    },
+    "tableSourceSpec" : {
+      "vectorSearchTableSource" : {
+        "table" : {
+          "identifier" : 
"`default_catalog`.`default_database`.`async_vector_table`",
+          "resolvedTable" : {
+            "schema" : {
+              "columns" : [ {
+                "name" : "label",
+                "dataType" : "VARCHAR(2147483647)"
+              }, {
+                "name" : "vector",
+                "dataType" : "ARRAY<FLOAT>"
+              } ]
+            }
+          }
+        }
+      },
+      "outputType" : "ROW<`label` VARCHAR(2147483647), `vector` ARRAY<FLOAT>> 
NOT NULL"
+    },
+    "vectorSearchSpec" : {
+      "joinType" : "INNER",
+      "searchColumns" : {
+        "1" : {
+          "type" : "FieldRef",
+          "index" : 2
+        }
+      },
+      "topK" : {
+        "type" : "Constant",
+        "sourceType" : "INT NOT NULL",
+        "literal" : {
+          "kind" : "LITERAL",
+          "value" : 2,
+          "type" : "INT NOT NULL"
+        }
+      },
+      "runtimeConfig" : {
+        "async" : "false"
+      },
+      "outputType" : "ROW<`label` VARCHAR(2147483647), `vector` ARRAY<FLOAT>, 
`score` DOUBLE NOT NULL>"
+    },
+    "asyncOptions" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `vector` 
ARRAY<FLOAT>, `label` VARCHAR(2147483647), `vector0` ARRAY<FLOAT>, `score` 
DOUBLE NOT NULL>",
+    "description" : 
"VectorSearchTableFunction(table=[default_catalog.default_database.async_vector_table],
 joinType=[InnerJoin], columnToSearch=[vector], columnToQuery=[vector], 
topK=[2], select=[id, content, vector, label, vector, score])"
+  }, {
+    "id" : 11,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `label` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[id, content, label])"
+  }, {
+    "id" : 12,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "content",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "label",
+              "dataType" : "VARCHAR(2147483647)"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` BIGINT, `content` VARCHAR(2147483647), `label` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[id, content, label])"
+  } ],
+  "edges" : [ {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/vector-search-with-runtime-config/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/vector-search-with-runtime-config/savepoint/_metadata
new file mode 100644
index 00000000000..ad2cb61d28d
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-vector-search-table-function_1/vector-search-with-runtime-config/savepoint/_metadata
 differ

Reply via email to