This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e4e6d7082e [FLINK-38425][table] Add rule to convert correlate node to 
vector search physical node (#27121)
0e4e6d7082e is described below

commit 0e4e6d7082e83f098d0c1a94351babb3ea407aa8
Author: Shengkai <[email protected]>
AuthorDate: Mon Oct 20 12:03:33 2025 +0800

    [FLINK-38425][table] Add rule to convert correlate node to vector search 
physical node (#27121)
---
 .../sql/ml/SqlVectorSearchTableFunction.java       |  18 +-
 .../plan/nodes/exec/spec/VectorSearchSpec.java     |  72 +++++
 .../StreamPhysicalVectorSearchTableFunction.java   | 144 +++++++++
 ...treamPhysicalVectorSearchTableFunctionRule.java | 352 +++++++++++++++++++++
 .../table/planner/plan/utils/FunctionCallUtil.java |   2 +-
 .../FlinkChangelogModeInferenceProgram.scala       |  18 +-
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |   1 +
 .../stream/sql/VectorSearchTableFunctionTest.java  | 225 ++++++++++++-
 .../stream/sql/VectorSearchTableFunctionTest.xml   | 148 ++++++++-
 9 files changed, 950 insertions(+), 30 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlVectorSearchTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlVectorSearchTableFunction.java
index a655efdf9f0..5499a076b2e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlVectorSearchTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlVectorSearchTableFunction.java
@@ -195,7 +195,7 @@ public class SqlVectorSearchTableFunction extends 
SqlFunction implements SqlTabl
                         throwOnFailure);
             }
 
-            // check topK is literal
+            // check top_k is a positive integer literal
             LogicalType topKType = 
toLogicalType(callBinding.getOperandType(3));
             if (!operands.get(3).getKind().equals(SqlKind.LITERAL)
                     || !topKType.is(LogicalTypeRoot.INTEGER)) {
@@ -203,11 +203,20 @@ public class SqlVectorSearchTableFunction extends 
SqlFunction implements SqlTabl
                         Optional.of(
                                 new ValidationException(
                                         String.format(
-                                                "Expect parameter topK is 
integer literal in VECTOR_SEARCH, but it is %s with type %s.",
+                                                "Expect parameter top_k is an 
INTEGER NOT NULL literal in VECTOR_SEARCH, but it is %s with type %s.",
                                                 operands.get(3), topKType))),
                         throwOnFailure);
             }
-
+            Integer topK = callBinding.getOperandLiteralValue(3, 
Integer.class);
+            if (topK == null || topK <= 0) {
+                return SqlValidatorUtils.throwExceptionOrReturnFalse(
+                        Optional.of(
+                                new ValidationException(
+                                        String.format(
+                                                "Parameter top_k must be 
greater than 0, but was %s.",
+                                                topK))),
+                        throwOnFailure);
+            }
             return true;
         }
 
@@ -218,7 +227,8 @@ public class SqlVectorSearchTableFunction extends 
SqlFunction implements SqlTabl
 
         @Override
         public String getAllowedSignatures(SqlOperator op, String opName) {
-            return opName + "(TABLE table_name, DESCRIPTOR(query_column), 
search_column, top_k)";
+            return opName
+                    + "(TABLE search_table, DESCRIPTOR(column_to_search), 
column_to_query, top_k)";
         }
 
         @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchSpec.java
new file mode 100644
index 00000000000..289e5f26f9b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchSpec.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.spec;
+
+import 
org.apache.flink.table.planner.plan.utils.FunctionCallUtil.FunctionParam;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rel.core.JoinRelType;
+
+import java.util.Map;
+
+/** VectorSearchSpec describes how vector search is performed. */
+public class VectorSearchSpec {
+
+    public static final String FIELD_NAME_JOIN_TYPE = "joinType";
+    public static final String FIELD_NAME_SEARCH_COLUMNS = "searchColumns";
+    public static final String FIELD_NAME_TOP_K = "topK";
+
+    @JsonProperty(FIELD_NAME_JOIN_TYPE)
+    private final JoinRelType joinRelType;
+
+    /** KV: column_to_search -> column_to_query. */
+    @JsonProperty(FIELD_NAME_SEARCH_COLUMNS)
+    private final Map<Integer, FunctionParam> searchColumns;
+
+    @JsonProperty(FIELD_NAME_TOP_K)
+    private final FunctionParam topK;
+
+    @JsonCreator
+    public VectorSearchSpec(
+            @JsonProperty(FIELD_NAME_JOIN_TYPE) JoinRelType joinRelType,
+            @JsonProperty(FIELD_NAME_SEARCH_COLUMNS) Map<Integer, 
FunctionParam> searchColumns,
+            @JsonProperty(FIELD_NAME_TOP_K) FunctionParam topK) {
+        this.joinRelType = joinRelType;
+        this.searchColumns = searchColumns;
+        this.topK = topK;
+    }
+
+    @JsonIgnore
+    public JoinRelType getJoinType() {
+        return joinRelType;
+    }
+
+    @JsonIgnore
+    public Map<Integer, FunctionParam> getSearchColumns() {
+        return searchColumns;
+    }
+
+    @JsonIgnore
+    public FunctionParam getTopK() {
+        return topK;
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalVectorSearchTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalVectorSearchTableFunction.java
new file mode 100644
index 00000000000..e45da228d21
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalVectorSearchTableFunction.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream;
+
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchSpec;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
+import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexProgram;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Stream physical RelNode for vector search table function. */
+public class StreamPhysicalVectorSearchTableFunction extends SingleRel
+        implements StreamPhysicalRel {
+
+    private final RelOptTable searchTable;
+    private final @Nullable RexProgram calcProgram;
+    private final VectorSearchSpec vectorSearchSpec;
+    private final RelDataType outputRowType;
+
+    public StreamPhysicalVectorSearchTableFunction(
+            RelOptCluster cluster,
+            RelTraitSet traits,
+            RelNode input,
+            RelOptTable searchTable,
+            @Nullable RexProgram calcProgram,
+            VectorSearchSpec vectorSearchSpec,
+            RelDataType outputRowType) {
+        super(cluster, traits, input);
+        this.searchTable = searchTable;
+        this.calcProgram = calcProgram;
+        this.vectorSearchSpec = vectorSearchSpec;
+        this.outputRowType = outputRowType;
+    }
+
+    @Override
+    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new StreamPhysicalVectorSearchTableFunction(
+                getCluster(),
+                traitSet,
+                inputs.get(0),
+                searchTable,
+                calcProgram,
+                vectorSearchSpec,
+                outputRowType);
+    }
+
+    @Override
+    protected RelDataType deriveRowType() {
+        return outputRowType;
+    }
+
+    @Override
+    public RelWriter explainTerms(RelWriter pw) {
+        List<String> columnToSearch =
+                vectorSearchSpec.getSearchColumns().keySet().stream()
+                        .map(
+                                calcProgram == null
+                                        ? 
searchTable.getRowType().getFieldNames()::get
+                                        : 
calcProgram.getOutputRowType().getFieldNames()::get)
+                        .collect(Collectors.toList());
+        List<String> columnToQuery =
+                vectorSearchSpec.getSearchColumns().values().stream()
+                        .map(this::explainQueryColumnParam)
+                        .collect(Collectors.toList());
+
+        Integer topK =
+                ((FunctionCallUtil.Constant) vectorSearchSpec.getTopK())
+                        .literal.getValueAs(Integer.class);
+
+        String leftSelect = String.join(", ", 
getInput(0).getRowType().getFieldNames());
+        String rightSelect =
+                calcProgram == null
+                        ? String.join(", ", 
searchTable.getRowType().getFieldNames())
+                        : RelExplainUtil.selectionToString(
+                                calcProgram,
+                                this::getExpressionString,
+                                RelExplainUtil.preferExpressionFormat(pw),
+                                
convertToExpressionDetail(pw.getDetailLevel()));
+
+        return super.explainTerms(pw)
+                .item(
+                        "table",
+                        ((TableSourceTable) searchTable)
+                                .contextResolvedTable()
+                                .getIdentifier()
+                                .asSummaryString())
+                .item("joinType", 
JoinTypeUtil.getFlinkJoinType(vectorSearchSpec.getJoinType()))
+                .item("columnToSearch", String.join(", ", columnToSearch))
+                .item("columnToQuery", String.join(", ", columnToQuery))
+                .item("topK", topK)
+                .item("select", String.join(", ", leftSelect, rightSelect, 
"score"));
+    }
+
+    @Override
+    public boolean requireWatermark() {
+        return false;
+    }
+
+    @Override
+    public ExecNode<?> translateToExecNode() {
+        throw new UnsupportedOperationException("Vector search not supported 
yet.");
+    }
+
+    private String explainQueryColumnParam(FunctionCallUtil.FunctionParam 
param) {
+        if (param instanceof FunctionCallUtil.FieldRef) {
+            int index = ((FunctionCallUtil.FieldRef) param).index;
+            return getInput(0).getRowType().getFieldNames().get(index);
+        } else if (param instanceof FunctionCallUtil.Constant) {
+            return ((FunctionCallUtil.Constant) param).literal.toString();
+        }
+        return null;
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalVectorSearchTableFunctionRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalVectorSearchTableFunctionRule.java
new file mode 100644
index 00000000000..ead37a92a6d
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalVectorSearchTableFunctionRule.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import 
org.apache.flink.table.planner.functions.sql.ml.SqlVectorSearchTableFunction;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchSpec;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalVectorSearchTableFunction;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.util.Util;
+import org.immutables.value.Value;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rule to convert a {@link FlinkLogicalCorrelate} with VECTOR_SEARCH call 
into a {@link
+ * StreamPhysicalVectorSearchTableFunction}.
+ */
[email protected]
+public class StreamPhysicalVectorSearchTableFunctionRule
+        extends RelRule<StreamPhysicalVectorSearchTableFunctionRule.Config> {
+
+    public static final StreamPhysicalVectorSearchTableFunctionRule INSTANCE =
+            Config.DEFAULT.toRule();
+
+    protected StreamPhysicalVectorSearchTableFunctionRule(Config config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        FlinkLogicalTableFunctionScan scan = call.rel(2);
+        RexNode rexNode = scan.getCall();
+        if (!(rexNode instanceof RexCall)) {
+            return false;
+        }
+        RexCall rexCall = (RexCall) rexNode;
+        return rexCall.getOperator() instanceof SqlVectorSearchTableFunction;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        // QUERY_TABLE
+        RelNode input = call.rel(1);
+        final RelNode newInput = RelOptRule.convert(input, 
FlinkConventions.STREAM_PHYSICAL());
+
+        // SEARCH_TABLE
+        FlinkLogicalCorrelate correlate = call.rel(0);
+        FlinkLogicalTableFunctionScan vectorSearchCall = call.rel(2);
+        String functionName = ((RexCall) 
vectorSearchCall.getCall()).getOperator().getName();
+        SearchTableExtractor extractor = new 
SearchTableExtractor(functionName);
+        extractor.visit(vectorSearchCall.getInput(0));
+
+        call.transformTo(
+                new StreamPhysicalVectorSearchTableFunction(
+                        correlate.getCluster(),
+                        
correlate.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL()),
+                        newInput,
+                        extractor.searchTable,
+                        extractor.calcProgram == null
+                                ? null
+                                : pullUpRexProgram(
+                                        input.getCluster(),
+                                        extractor.searchTable.getRowType(),
+                                        vectorSearchCall.getRowType(),
+                                        correlate
+                                                .getRowType()
+                                                .getFieldList()
+                                                .subList(
+                                                        
input.getRowType().getFieldCount(),
+                                                        
correlate.getRowType().getFieldCount()),
+                                        extractor.calcProgram),
+                        buildVectorSearchSpec(
+                                correlate, vectorSearchCall, 
extractor.searchTable, functionName),
+                        correlate.getRowType()));
+    }
+
+    private VectorSearchSpec buildVectorSearchSpec(
+            FlinkLogicalCorrelate correlate,
+            FlinkLogicalTableFunctionScan scan,
+            RelOptTable searchTable,
+            String functionName) {
+        JoinRelType joinType = correlate.getJoinType();
+        if (joinType != JoinRelType.INNER && joinType != JoinRelType.LEFT) {
+            throw new TableException(
+                    String.format(
+                            "%s only supports INNER JOIN and LEFT JOIN, but 
get %s JOIN.",
+                            functionName, joinType));
+        }
+
+        RexCall functionCall = (RexCall) scan.getCall();
+
+        // COLUMN_TO_SEARCH
+        RexCall descriptorCall = (RexCall) functionCall.getOperands().get(1);
+        RexNode searchColumn = descriptorCall.getOperands().get(0);
+        if (!(searchColumn instanceof RexLiteral)) {
+            throw new TableException(
+                    String.format(
+                            "%s got an unknown parameter column_to_search in 
descriptor: %s.",
+                            functionName, searchColumn));
+        }
+        int searchIndex =
+                searchTable
+                        .getRowType()
+                        .getFieldNames()
+                        .indexOf(RexLiteral.stringValue(searchColumn));
+        if (searchIndex == -1) {
+            throw new TableException(
+                    String.format(
+                            "%s can not find column `%s` in the search_table 
%s physical output type. Currently, Flink doesn't support to use computed 
column as the search column.",
+                            functionName,
+                            RexLiteral.stringValue(searchColumn),
+                            String.join(".", searchTable.getQualifiedName())));
+        }
+
+        // COLUMN_TO_QUERY
+        FunctionCallUtil.FunctionParam queryColumn =
+                getQueryColumnParam(functionCall.getOperands().get(2), 
correlate, functionName);
+
+        Map<Integer, FunctionCallUtil.FunctionParam> searchColumns = new 
LinkedHashMap<>();
+        searchColumns.put(searchIndex, queryColumn);
+
+        // TOP_K
+        RexLiteral topK = (RexLiteral) functionCall.getOperands().get(3);
+        FunctionCallUtil.Constant topKParam =
+                new 
FunctionCallUtil.Constant(FlinkTypeFactory.toLogicalType(topK.getType()), topK);
+
+        return new VectorSearchSpec(joinType, searchColumns, topKParam);
+    }
+
+    private FunctionCallUtil.FunctionParam getQueryColumnParam(
+            RexNode queryColumn, FlinkLogicalCorrelate correlate, String 
functionName) {
+        if (queryColumn instanceof RexFieldAccess) {
+            RexNode refNode = ((RexFieldAccess) 
queryColumn).getReferenceExpr();
+            if (refNode instanceof RexFieldAccess) {
+                // nested field unsupported
+                throw new TableException(
+                        String.format(
+                                "%s does not support nested field in parameter 
column_to_query, but get %s.",
+                                functionName, queryColumn));
+            } else if 
(!(correlate.getCorrelationId().equals(((RexCorrelVariable) refNode).id))) {
+                throw new TableException(
+                        String.format(
+                                "This is a bug. Planner can not resolve the 
correlation in %s. Please file an issue.",
+                                functionName));
+            }
+            return new FunctionCallUtil.FieldRef(
+                    ((RexFieldAccess) queryColumn).getField().getIndex());
+        } else {
+            throw new TableException(
+                    String.format(
+                            "Expect function %s's parameter column_to_query is 
literal or field reference, but get expression %s. ",
+                            functionName, queryColumn));
+        }
+    }
+
+    /**
+     * Pull up the Calc under the VectorSearchCall.
+     *
+     * <p>Note: The vector search operator actually fetch the data and then do 
the calculation. So
+     * pull up the calc is to align the behaviour.
+     */
+    private RexProgram pullUpRexProgram(
+            RelOptCluster cluster,
+            RelDataType scanOutputType,
+            RelDataType originFunctionCallType,
+            List<RelDataTypeField> correlateRightOutputTypes,
+            RexProgram originProgram) {
+        RelDataType searchOutputType =
+                cluster.getTypeFactory()
+                        .builder()
+                        .kind(scanOutputType.getStructKind())
+                        .addAll(scanOutputType.getFieldList())
+                        .add(Util.last(originFunctionCallType.getFieldList()))
+                        .build();
+        RelDataType newOutputType =
+                cluster.getTypeFactory()
+                        .builder()
+                        .kind(originProgram.getOutputRowType().getStructKind())
+                        .addAll(correlateRightOutputTypes)
+                        .build();
+        List<RexNode> exprs = new ArrayList<>(originProgram.getExprList());
+        exprs.add(
+                cluster.getRexBuilder()
+                        .makeInputRef(searchOutputType, 
searchOutputType.getFieldCount() - 1));
+        return RexProgramBuilder.create(
+                        cluster.getRexBuilder(),
+                        searchOutputType,
+                        exprs,
+                        originProgram.getProjectList(),
+                        originProgram.getCondition(),
+                        newOutputType,
+                        true,
+                        null)
+                .getProgram();
+    }
+
+    @Value.Immutable
+    public interface Config extends RelRule.Config {
+
+        Config DEFAULT =
+                
ImmutableStreamPhysicalVectorSearchTableFunctionRule.Config.builder()
+                        .build()
+                        .withOperandSupplier(
+                                b0 ->
+                                        b0.operand(FlinkLogicalCorrelate.class)
+                                                .inputs(
+                                                        b1 ->
+                                                                
b1.operand(FlinkLogicalRel.class)
+                                                                        
.anyInputs(),
+                                                        b2 ->
+                                                                b2.operand(
+                                                                               
 FlinkLogicalTableFunctionScan
+                                                                               
         .class)
+                                                                        
.anyInputs()))
+                        
.withDescription("StreamPhysicalVectorSearchTableFunctionRule");
+
+        @Override
+        default StreamPhysicalVectorSearchTableFunctionRule toRule() {
+            return new StreamPhysicalVectorSearchTableFunctionRule(this);
+        }
+    }
+
+    /**
+     * A utility class to extract table source and calc program.
+     *
+     * <p>Supported tree structure:
+     *
+     * <pre>{@code
+     * Calc(without filter) —— TableScan
+     * TableScan
+     * }</pre>
+     */
+    static class SearchTableExtractor {
+
+        enum NodeType {
+            CALC,
+            SCAN
+        }
+
+        @Nullable RexProgram calcProgram;
+        TableSourceTable searchTable;
+
+        private final String functionName;
+        private NodeType parentNode;
+
+        SearchTableExtractor(String functionName) {
+            this.functionName = functionName;
+        }
+
+        private void visit(RelNode rel) {
+            if (rel instanceof RelSubset) {
+                rel = ((RelSubset) rel).getBestOrOriginal();
+            }
+
+            NodeType currentNode = transform(rel);
+            switch (currentNode) {
+                case CALC:
+                    if (parentNode != null) {
+                        throw new RelOptPlanner.CannotPlanException(
+                                String.format(
+                                        "%s assumes calc to be the first node 
in parameter search_table, but it has a parent %s.",
+                                        functionName, parentNode));
+                    }
+                    calcProgram = ((Calc) rel).getProgram();
+                    if (calcProgram.getCondition() != null) {
+                        throw new RelOptPlanner.CannotPlanException(
+                                String.format(
+                                        "%s does not support filter on 
parameter search_table.",
+                                        functionName));
+                    }
+                    break;
+                case SCAN:
+                    if (!(((TableScan) rel).getTable() instanceof 
TableSourceTable)) {
+                        throw new RelOptPlanner.CannotPlanException(
+                                "%s does not support search_table of type: "
+                                        + searchTable.getClass());
+                    }
+                    searchTable = (TableSourceTable) ((TableScan) 
rel).getTable();
+                    break;
+            }
+
+            parentNode = currentNode;
+            if (currentNode != NodeType.SCAN) {
+                visit(rel.getInput(0));
+            }
+        }
+
+        private NodeType transform(RelNode node) {
+            NodeType transformed;
+            if (node instanceof Calc) {
+                transformed = NodeType.CALC;
+            } else if (node instanceof TableScan) {
+                transformed = NodeType.SCAN;
+            } else {
+                throw new RelOptPlanner.CannotPlanException(
+                        String.format(
+                                "%s does not support %s node in parameter 
search_table.",
+                                functionName, 
node.getClass().getSimpleName()));
+            }
+            return transformed;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FunctionCallUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FunctionCallUtil.java
index 490915ef1c7..962f7cd5e46 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FunctionCallUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FunctionCallUtil.java
@@ -194,7 +194,7 @@ public abstract class FunctionCallUtil {
         return t1 != null ? t1 : t2;
     }
 
-    protected static AsyncDataStream.OutputMode convert(
+    public static AsyncDataStream.OutputMode convert(
             ChangelogMode inputChangelogMode,
             ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) {
         if (inputChangelogMode.containsOnly(RowKind.INSERT)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 8134ea7d137..237ba7edea1 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -341,15 +341,10 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
         createNewNode(rel, children, ModifyKindSetTrait.INSERT_ONLY, 
requiredTrait, requester)
 
-      case ml_predict: StreamPhysicalMLPredictTableFunction =>
-        // MLPredict supports only support consuming insert-only
-        val children = visitChildren(ml_predict, 
ModifyKindSetTrait.INSERT_ONLY)
-        createNewNode(
-          ml_predict,
-          children,
-          ModifyKindSetTrait.INSERT_ONLY,
-          requiredTrait,
-          requester)
+      case _: StreamPhysicalMLPredictTableFunction | _: 
StreamPhysicalVectorSearchTableFunction =>
+        // MLPredict, VectorSearch supports only support consuming insert-only
+        val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
+        createNewNode(rel, children, ModifyKindSetTrait.INSERT_ONLY, 
requiredTrait, requester)
 
       case join: StreamPhysicalJoin =>
         // join support all changes in input
@@ -741,7 +736,8 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         case _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin |
             _: StreamPhysicalExchange | _: StreamPhysicalExpand |
             _: StreamPhysicalMiniBatchAssigner | _: 
StreamPhysicalWatermarkAssigner |
-            _: StreamPhysicalWindowTableFunction | _: 
StreamPhysicalMLPredictTableFunction =>
+            _: StreamPhysicalWindowTableFunction | _: 
StreamPhysicalMLPredictTableFunction |
+            _: StreamPhysicalVectorSearchTableFunction =>
           // transparent forward requiredTrait to children
           visitChildren(rel, requiredUpdateTrait) match {
             case None => None
@@ -1130,7 +1126,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
             _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
             _: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
             _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin 
|
-            _: StreamPhysicalMLPredictTableFunction =>
+            _: StreamPhysicalMLPredictTableFunction | _: 
StreamPhysicalVectorSearchTableFunction =>
           // if not explicitly supported, all operators require full deletes 
if there are updates
           val children = rel.getInputs.map {
             case child: StreamPhysicalRel =>
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index d1d1f64b9d5..26b98c32a7f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -488,6 +488,7 @@ object FlinkStreamRuleSets {
     StreamPhysicalProcessTableFunctionRule.INSTANCE,
     // model TVFs
     StreamPhysicalMLPredictTableFunctionRule.INSTANCE,
+    StreamPhysicalVectorSearchTableFunctionRule.INSTANCE,
     // join
     StreamPhysicalJoinRule.INSTANCE,
     StreamPhysicalMultiJoinRule.INSTANCE,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.java
index 5d85e6b88eb..ef949677394 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.java
@@ -19,16 +19,27 @@
 package org.apache.flink.table.planner.plan.stream.sql;
 
 import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.factories.TestTimeTravelCatalog;
 import 
org.apache.flink.table.planner.functions.sql.ml.SqlVectorSearchTableFunction;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
+import org.apache.flink.table.planner.utils.DateTimeTestUtil;
 import org.apache.flink.table.planner.utils.TableTestBase;
 import org.apache.flink.table.planner.utils.TableTestUtil;
 
+import org.apache.calcite.plan.RelOptPlanner;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.time.ZoneId;
+import java.util.Collections;
+
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link SqlVectorSearchTableFunction}. */
@@ -64,6 +75,29 @@ public class VectorSearchTableFunctionTest extends 
TableTestBase {
                                 + ") with (\n"
                                 + "  'connector' = 'values'\n"
                                 + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE VectorTableWithProctime (\n"
+                                + "  e INT,\n"
+                                + "  f BIGINT,\n"
+                                + "  g ARRAY<FLOAT>,\n"
+                                + "  proctime as PROCTIME()\n"
+                                + ") with (\n"
+                                + "  'connector' = 'values'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE VectorTableWithMetadata(\n"
+                                + "  e INT,\n"
+                                + "  f ARRAY<FLOAT> METADATA,\n"
+                                + "  g ARRAY<FLOAT>,\n"
+                                + "  h AS e + 1\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'readable-metadata' = 'f:ARRAY<FLOAT>'\n"
+                                + ")");
     }
 
     @Test
@@ -204,7 +238,7 @@ public class VectorSearchTableFunctionTest extends 
TableTestBase {
                 .satisfies(
                         FlinkAssertions.anyCauseMatches(
                                 ValidationException.class,
-                                "Expect parameter topK is integer literal in 
VECTOR_SEARCH, but it is 10.0 with type DECIMAL(3, 1) NOT NULL."));
+                                "Expect parameter top_k is an INTEGER NOT NULL 
literal in VECTOR_SEARCH, but it is 10.0 with type DECIMAL(3, 1) NOT NULL."));
     }
 
     @Test
@@ -219,6 +253,193 @@ public class VectorSearchTableFunctionTest extends 
TableTestBase {
                 .satisfies(
                         FlinkAssertions.anyCauseMatches(
                                 ValidationException.class,
-                                "Expect parameter topK is integer literal in 
VECTOR_SEARCH, but it is QueryTable.a with type INT."));
+                                "Expect parameter top_k is an INTEGER NOT NULL 
literal in VECTOR_SEARCH, but it is QueryTable.a with type INT."));
+    }
+
+    @Test
+    void testIllegalTopKValue3() {
+        String sql =
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "VECTOR_SEARCH(\n"
+                        + "    TABLE VectorTable, DESCRIPTOR(`g`), 
QueryTable.d, 0"
+                        + ")\n"
+                        + ")";
+        assertThatThrownBy(() -> util.verifyRelPlan(sql))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                ValidationException.class,
+                                "Parameter top_k must be greater than 0, but 
was 0."));
+    }
+
+    @Test
+    void testSearchTableWithCalc() {
+        // calc -> source
+        util.verifyRelPlan(
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "VECTOR_SEARCH(\n"
+                        + "    TABLE VectorTableWithProctime, DESCRIPTOR(`g`), 
QueryTable.d, 10))");
+    }
+
+    @Test
+    void testSearchTableWithProjection() {
+        util.tableEnv()
+                .executeSql(
+                        String.format(
+                                "CREATE FUNCTION add_one AS '%s'",
+                                
JavaUserDefinedScalarFunctions.JavaFunc0.class.getName()));
+        util.verifyRelPlan(
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "VECTOR_SEARCH(\n"
+                        + "    (SELECT add_one(e) as e1, g, proctime FROM 
VectorTableWithProctime), DESCRIPTOR(`g`), QueryTable.d, 10))");
+    }
+
+    @Test
+    void testSearchTableWithMetadataTable() {
+        util.verifyRelPlan(
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "  VECTOR_SEARCH(\n"
+                        + "    TABLE VectorTableWithMetadata,\n"
+                        + "    DESCRIPTOR(`g`),\n"
+                        + "    QueryTable.d,\n"
+                        + "    10"
+                        + "  )\n"
+                        + ")");
+    }
+
+    @Test
+    void testSearchTableWithDescriptorUsingMetadata() {
+        util.verifyRelPlan(
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "  VECTOR_SEARCH(\n"
+                        + "    TABLE VectorTableWithMetadata,\n"
+                        + "    DESCRIPTOR(`f`),\n"
+                        + "    QueryTable.d,\n"
+                        + "    10"
+                        + "  )\n"
+                        + ")");
+    }
+
+    @Test
+    void testSearchTableUsingUDFComputedColumn() {
+        util.tableEnv()
+                .executeSql(
+                        String.format("CREATE FUNCTION udf AS '%s'", 
TestArrayUDF.class.getName()));
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE VectorTableWithComputedColumn (\n"
+                                + "  e INT NOT NULL,\n"
+                                + "  f BIGINT,\n"
+                                + "  g ARRAY<FLOAT>,\n"
+                                + "  h as udf(e)\n"
+                                + ") with (\n"
+                                + "  'connector' = 'values'\n"
+                                + ")");
+        assertThatThrownBy(
+                        () ->
+                                util.verifyRelPlan(
+                                        "SELECT * FROM QueryTable, LATERAL 
TABLE(\n"
+                                                + "  VECTOR_SEARCH(\n"
+                                                + "    TABLE 
VectorTableWithComputedColumn,\n"
+                                                + "    DESCRIPTOR(`h`),\n"
+                                                + "    QueryTable.d,\n"
+                                                + "    10"
+                                                + "  )\n"
+                                                + ")"))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                TableException.class,
+                                "VECTOR_SEARCH can not find column `h` in the 
search_table default_catalog.default_database.VectorTableWithComputedColumn 
physical output type. "
+                                        + "Currently, Flink doesn't support to 
use computed column as the search column."));
+    }
+
+    @Test
+    void testSearchTableWithSnapshot() throws Exception {
+        String catalogName = "ttc";
+        TestTimeTravelCatalog catalog = new TestTimeTravelCatalog(catalogName);
+        catalog.registerTableForTimeTravel(
+                "t1",
+                Schema.newBuilder()
+                        .column("f1", DataTypes.INT())
+                        .column("f2", DataTypes.ARRAY(DataTypes.DOUBLE()))
+                        .build(),
+                Collections.singletonMap("connector", "values"),
+                DateTimeTestUtil.toEpochMills(
+                        "2023-07-31 00:00:00", "yyyy-MM-dd HH:mm:ss", 
ZoneId.of("UTC")));
+
+        TableEnvironment tEnv = util.tableEnv();
+        tEnv.registerCatalog(catalogName, catalog);
+        tEnv.useCatalog(catalogName);
+        assertThatThrownBy(
+                        () ->
+                                util.verifyRelPlan(
+                                        "SELECT * FROM (select *, proctime() 
pts from t1) qt, LATERAL TABLE(\n"
+                                                + "VECTOR_SEARCH(\n"
+                                                + "    (SELECT * FROM t1 FOR 
SYSTEM_TIME AS OF qt.pts), DESCRIPTOR(`f2`), qt.f2, 10))"))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                RelOptPlanner.CannotPlanException.class,
+                                "VECTOR_SEARCH does not support 
FlinkLogicalSnapshot node in parameter search_table."));
+    }
+
+    @Test
+    void testSearchTableWithFilter() {
+        String sql =
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "VECTOR_SEARCH(\n"
+                        + "    (SELECT * FROM VectorTable WHERE e > 0),\n"
+                        + "    DESCRIPTOR(`g`),\n"
+                        + "    QueryTable.d,\n"
+                        + "    10"
+                        + ")\n"
+                        + ")";
+        assertThatThrownBy(() -> util.verifyRelPlan(sql))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                RelOptPlanner.CannotPlanException.class,
+                                "VECTOR_SEARCH does not support filter on 
parameter search_table."));
+    }
+
+    @Test
+    void testSearchTableWithWatermark() {
+        // watermark assigner -> calc -> scan
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE IllegalTable (\n"
+                                + "  e INT,\n"
+                                + "  f BIGINT,\n"
+                                + "  g ARRAY<FLOAT>,\n"
+                                + "  rowtime TIMESTAMP(3),\n"
+                                + "  proctime as PROCTIME(),\n"
+                                + "  WATERMARK FOR rowtime AS rowtime - 
INTERVAL '1' SECOND\n"
+                                + ") with (\n"
+                                + "  'connector' = 'values'\n"
+                                + ")");
+        String sql =
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "VECTOR_SEARCH(\n"
+                        + "    TABLE IllegalTable, DESCRIPTOR(`g`), 
QueryTable.d, 10))";
+        assertThatThrownBy(() -> util.verifyRelPlan(sql))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                RelOptPlanner.CannotPlanException.class,
+                                "VECTOR_SEARCH does not support 
FlinkLogicalWatermarkAssigner node in parameter search_table."));
+    }
+
+    @Test
+    void testSearchTableNonExistColumn() {
+        String sql =
+                "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+                        + "VECTOR_SEARCH(\n"
+                        + "    TABLE VectorTable, DESCRIPTOR(`z`), 
QueryTable.d, 10"
+                        + ")\n"
+                        + ")";
+        assertThatThrownBy(() -> util.verifyRelPlan(sql))
+                .satisfies(FlinkAssertions.anyCauseMatches("Unknown identifier 
'z'"));
+    }
+
+    public static class TestArrayUDF extends ScalarFunction {
+        public Float[] eval(int i) {
+            return new Float[] {(float) i};
+        }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.xml
index 8aca81dc52d..0933534e171 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.xml
@@ -37,7 +37,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5], a0=[
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
a0, score, score0, score1, score2])
-+- Correlate(invocation=[VECTOR_SEARCH(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'score'), $cor0.d, 10)], 
correlate=[table(VECTOR_SEARCH(TABLE(),DESCRIPTOR('score'),$cor0.d,10))], 
select=[a,b,c,d,rowtime,proctime,a0,score,score0,score1,score2], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, FLOAT ARRAY d, 
TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, INTEGER 
a0, FLOAT ARRAY score, FLOAT ARRAY score0, FLOAT ARRAY score1, DOUBLE score2)], 
joinType=[INNER])
++- 
VectorSearchTableFunction(table=[default_catalog.default_database.NameConflictTable],
 joinType=[InnerJoin], columnToSearch=[score], columnToQuery=[d], topK=[10], 
select=[a, b, c, d, rowtime, proctime, a, score, score0, score1, score])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
QueryTable]], fields=[a, b, c, d, rowtime])
@@ -69,18 +69,22 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
rowtime=[$4], proctime=[$5], e=[$
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e, f, g, score])
-+- Correlate(invocation=[VECTOR_SEARCH(TABLE(#0), DESCRIPTOR(_UTF-16LE'g'), 
$cor0.d, 10)], 
correlate=[table(VECTOR_SEARCH(TABLE(),DESCRIPTOR('g'),$cor0.d,10))], 
select=[a,b,c,d,rowtime,proctime,e,f,g,score], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, FLOAT ARRAY d, TIMESTAMP(3) *ROWTIME* rowtime, 
TIMESTAMP_LTZ(3) *PROCTIME* proctime, INTEGER e, BIGINT f, FLOAT ARRAY g, 
DOUBLE score)], joinType=[INNER])
++- 
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTable], 
joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10], 
select=[a, b, c, d, rowtime, proctime, e, f, g, score])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
QueryTable]], fields=[a, b, c, d, rowtime])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSimple">
+  <TestCase name="testOutOfOrderNamedArgument">
     <Resource name="sql">
       <![CDATA[SELECT * FROM QueryTable, LATERAL TABLE(
 VECTOR_SEARCH(
-    TABLE VectorTable, DESCRIPTOR(`g`), QueryTable.d, 10)
+    COLUMN_TO_QUERY => QueryTable.d,
+    COLUMN_TO_SEARCH => DESCRIPTOR(`g`),
+    TOP_K => 10,
+    SEARCH_TABLE => TABLE VectorTable
+  )
 )]]>
     </Resource>
     <Resource name="ast">
@@ -98,22 +102,142 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
rowtime=[$4], proctime=[$5], e=[$
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e, f, g, score])
-+- Correlate(invocation=[VECTOR_SEARCH(TABLE(#0), DESCRIPTOR(_UTF-16LE'g'), 
$cor0.d, 10)], 
correlate=[table(VECTOR_SEARCH(TABLE(),DESCRIPTOR('g'),$cor0.d,10))], 
select=[a,b,c,d,rowtime,proctime,e,f,g,score], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, FLOAT ARRAY d, TIMESTAMP(3) *ROWTIME* rowtime, 
TIMESTAMP_LTZ(3) *PROCTIME* proctime, INTEGER e, BIGINT f, FLOAT ARRAY g, 
DOUBLE score)], joinType=[INNER])
++- 
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTable], 
joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10], 
select=[a, b, c, d, rowtime, proctime, e, f, g, score])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
QueryTable]], fields=[a, b, c, d, rowtime])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testOutOfOrderNamedArgument">
+  <TestCase name="testSearchTableWithCalc">
     <Resource name="sql">
       <![CDATA[SELECT * FROM QueryTable, LATERAL TABLE(
 VECTOR_SEARCH(
-    COLUMN_TO_QUERY => QueryTable.d,
-    COLUMN_TO_SEARCH => DESCRIPTOR(`g`),
-    TOP_K => 10,
-    SEARCH_TABLE => TABLE VectorTable
-  )
+    TABLE VectorTableWithProctime, DESCRIPTOR(`g`), QueryTable.d, 10))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
e=[$6], f=[$7], g=[$8], proctime0=[$9], score=[$10])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{3}])
+   :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+   :  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
QueryTable]])
+   +- LogicalTableFunctionScan(invocation=[VECTOR_SEARCH(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'g'), $cor0.d, 10)], rowType=[RecordType(INTEGER e, BIGINT 
f, FLOAT ARRAY g, TIMESTAMP_LTZ(3) *PROCTIME* proctime, DOUBLE score)])
+      +- LogicalProject(e=[$0], f=[$1], g=[$2], proctime=[$3])
+         +- LogicalProject(e=[$0], f=[$1], g=[$2], proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
VectorTableWithProctime]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e, f, g, PROCTIME_MATERIALIZE(proctime0) AS proctime0, score])
++- 
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTableWithProctime],
 joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10], 
select=[a, b, c, d, rowtime, proctime, e, f, g, PROCTIME() AS proctime0, score])
+   +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
+      +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
QueryTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSearchTableWithDescriptorUsingMetadata">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM QueryTable, LATERAL TABLE(
+  VECTOR_SEARCH(
+    TABLE VectorTableWithMetadata,
+    DESCRIPTOR(`f`),
+    QueryTable.d,
+    10  )
+)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
e=[$6], f=[$7], g=[$8], h=[$9], score=[$10])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{3}])
+   :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+   :  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
QueryTable]])
+   +- LogicalTableFunctionScan(invocation=[VECTOR_SEARCH(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'f'), $cor0.d, 10)], rowType=[RecordType(INTEGER e, FLOAT 
ARRAY f, FLOAT ARRAY g, INTEGER h, DOUBLE score)])
+      +- LogicalProject(e=[$0], f=[$1], g=[$2], h=[$3])
+         +- LogicalProject(e=[$0], f=[$2], g=[$1], h=[+($0, 1)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
VectorTableWithMetadata, metadata=[f]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e, f, g, h, score])
++- 
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTableWithMetadata],
 joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10], 
select=[a, b, c, d, rowtime, proctime, e, f, g, +(e, 1) AS h, score])
+   +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
+      +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
QueryTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSearchTableWithProjection">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM QueryTable, LATERAL TABLE(
+VECTOR_SEARCH(
+    (SELECT add_one(e) as e1, g, proctime FROM VectorTableWithProctime), 
DESCRIPTOR(`g`), QueryTable.d, 10))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
e1=[$6], g=[$7], proctime0=[$8], score=[$9])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{3}])
+   :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+   :  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
QueryTable]])
+   +- LogicalTableFunctionScan(invocation=[VECTOR_SEARCH(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'g'), $cor0.d, 10)], rowType=[RecordType(BIGINT e1, FLOAT 
ARRAY g, TIMESTAMP_LTZ(3) *PROCTIME* proctime, DOUBLE score)])
+      +- LogicalProject(e1=[add_one(CAST($0):BIGINT)], g=[$2], proctime=[$3])
+         +- LogicalProject(e=[$0], f=[$1], g=[$2], proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
VectorTableWithProctime]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e1, g, PROCTIME_MATERIALIZE(proctime0) AS proctime0, score])
++- 
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTableWithProctime],
 joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10], 
select=[a, b, c, d, rowtime, proctime, add_one(CAST(e AS BIGINT)) AS e1, g, 
PROCTIME() AS proctime0, score])
+   +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
+      +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
QueryTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSearchTableWithMetadataTable">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM QueryTable, LATERAL TABLE(
+  VECTOR_SEARCH(
+    TABLE VectorTableWithMetadata,
+    DESCRIPTOR(`g`),
+    QueryTable.d,
+    10  )
+)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
e=[$6], f=[$7], g=[$8], h=[$9], score=[$10])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{3}])
+   :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+   :  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
QueryTable]])
+   +- LogicalTableFunctionScan(invocation=[VECTOR_SEARCH(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'g'), $cor0.d, 10)], rowType=[RecordType(INTEGER e, FLOAT 
ARRAY f, FLOAT ARRAY g, INTEGER h, DOUBLE score)])
+      +- LogicalProject(e=[$0], f=[$1], g=[$2], h=[$3])
+         +- LogicalProject(e=[$0], f=[$2], g=[$1], h=[+($0, 1)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
VectorTableWithMetadata, metadata=[f]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e, f, g, h, score])
++- 
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTableWithMetadata],
 joinType=[InnerJoin], columnToSearch=[f], columnToQuery=[d], topK=[10], 
select=[a, b, c, d, rowtime, proctime, e, f, g, +(e, 1) AS h, score])
+   +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
+      +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
QueryTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSimple">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM QueryTable, LATERAL TABLE(
+VECTOR_SEARCH(
+    TABLE VectorTable, DESCRIPTOR(`g`), QueryTable.d, 10)
 )]]>
     </Resource>
     <Resource name="ast">
@@ -131,7 +255,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
rowtime=[$4], proctime=[$5], e=[$
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e, f, g, score])
-+- Correlate(invocation=[VECTOR_SEARCH(TABLE(#0), DESCRIPTOR(_UTF-16LE'g'), 
$cor0.d, 10)], 
correlate=[table(VECTOR_SEARCH(TABLE(),DESCRIPTOR('g'),$cor0.d,10))], 
select=[a,b,c,d,rowtime,proctime,e,f,g,score], rowType=[RecordType(INTEGER a, 
BIGINT b, VARCHAR(2147483647) c, FLOAT ARRAY d, TIMESTAMP(3) *ROWTIME* rowtime, 
TIMESTAMP_LTZ(3) *PROCTIME* proctime, INTEGER e, BIGINT f, FLOAT ARRAY g, 
DOUBLE score)], joinType=[INNER])
++- 
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTable], 
joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10], 
select=[a, b, c, d, rowtime, proctime, e, f, g, score])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
QueryTable]], fields=[a, b, c, d, rowtime])

Reply via email to