This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0e4e6d7082e [FLINK-38425][table] Add rule to convert correlate node to
vector search physical node (#27121)
0e4e6d7082e is described below
commit 0e4e6d7082e83f098d0c1a94351babb3ea407aa8
Author: Shengkai <[email protected]>
AuthorDate: Mon Oct 20 12:03:33 2025 +0800
[FLINK-38425][table] Add rule to convert correlate node to vector search
physical node (#27121)
---
.../sql/ml/SqlVectorSearchTableFunction.java | 18 +-
.../plan/nodes/exec/spec/VectorSearchSpec.java | 72 +++++
.../StreamPhysicalVectorSearchTableFunction.java | 144 +++++++++
...treamPhysicalVectorSearchTableFunctionRule.java | 352 +++++++++++++++++++++
.../table/planner/plan/utils/FunctionCallUtil.java | 2 +-
.../FlinkChangelogModeInferenceProgram.scala | 18 +-
.../planner/plan/rules/FlinkStreamRuleSets.scala | 1 +
.../stream/sql/VectorSearchTableFunctionTest.java | 225 ++++++++++++-
.../stream/sql/VectorSearchTableFunctionTest.xml | 148 ++++++++-
9 files changed, 950 insertions(+), 30 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlVectorSearchTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlVectorSearchTableFunction.java
index a655efdf9f0..5499a076b2e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlVectorSearchTableFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlVectorSearchTableFunction.java
@@ -195,7 +195,7 @@ public class SqlVectorSearchTableFunction extends
SqlFunction implements SqlTabl
throwOnFailure);
}
- // check topK is literal
+ // check top_k is a positive integer literal
LogicalType topKType =
toLogicalType(callBinding.getOperandType(3));
if (!operands.get(3).getKind().equals(SqlKind.LITERAL)
|| !topKType.is(LogicalTypeRoot.INTEGER)) {
@@ -203,11 +203,20 @@ public class SqlVectorSearchTableFunction extends
SqlFunction implements SqlTabl
Optional.of(
new ValidationException(
String.format(
- "Expect parameter topK is
integer literal in VECTOR_SEARCH, but it is %s with type %s.",
+ "Expect parameter top_k is an
INTEGER NOT NULL literal in VECTOR_SEARCH, but it is %s with type %s.",
operands.get(3), topKType))),
throwOnFailure);
}
-
+ Integer topK = callBinding.getOperandLiteralValue(3,
Integer.class);
+ if (topK == null || topK <= 0) {
+ return SqlValidatorUtils.throwExceptionOrReturnFalse(
+ Optional.of(
+ new ValidationException(
+ String.format(
+ "Parameter top_k must be
greater than 0, but was %s.",
+ topK))),
+ throwOnFailure);
+ }
return true;
}
@@ -218,7 +227,8 @@ public class SqlVectorSearchTableFunction extends
SqlFunction implements SqlTabl
@Override
public String getAllowedSignatures(SqlOperator op, String opName) {
- return opName + "(TABLE table_name, DESCRIPTOR(query_column),
search_column, top_k)";
+ return opName
+ + "(TABLE search_table, DESCRIPTOR(column_to_search),
column_to_query, top_k)";
}
@Override
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchSpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchSpec.java
new file mode 100644
index 00000000000..289e5f26f9b
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/VectorSearchSpec.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.spec;
+
+import
org.apache.flink.table.planner.plan.utils.FunctionCallUtil.FunctionParam;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rel.core.JoinRelType;
+
+import java.util.Map;
+
+/** VectorSearchSpec describes how vector search is performed. */
+public class VectorSearchSpec {
+
+ public static final String FIELD_NAME_JOIN_TYPE = "joinType";
+ public static final String FIELD_NAME_SEARCH_COLUMNS = "searchColumns";
+ public static final String FIELD_NAME_TOP_K = "topK";
+
+ @JsonProperty(FIELD_NAME_JOIN_TYPE)
+ private final JoinRelType joinRelType;
+
+ /** KV: column_to_search -> column_to_query. */
+ @JsonProperty(FIELD_NAME_SEARCH_COLUMNS)
+ private final Map<Integer, FunctionParam> searchColumns;
+
+ @JsonProperty(FIELD_NAME_TOP_K)
+ private final FunctionParam topK;
+
+ @JsonCreator
+ public VectorSearchSpec(
+ @JsonProperty(FIELD_NAME_JOIN_TYPE) JoinRelType joinRelType,
+ @JsonProperty(FIELD_NAME_SEARCH_COLUMNS) Map<Integer,
FunctionParam> searchColumns,
+ @JsonProperty(FIELD_NAME_TOP_K) FunctionParam topK) {
+ this.joinRelType = joinRelType;
+ this.searchColumns = searchColumns;
+ this.topK = topK;
+ }
+
+ @JsonIgnore
+ public JoinRelType getJoinType() {
+ return joinRelType;
+ }
+
+ @JsonIgnore
+ public Map<Integer, FunctionParam> getSearchColumns() {
+ return searchColumns;
+ }
+
+ @JsonIgnore
+ public FunctionParam getTopK() {
+ return topK;
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalVectorSearchTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalVectorSearchTableFunction.java
new file mode 100644
index 00000000000..e45da228d21
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalVectorSearchTableFunction.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream;
+
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchSpec;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
+import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexProgram;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Stream physical RelNode for vector search table function. */
+public class StreamPhysicalVectorSearchTableFunction extends SingleRel
+ implements StreamPhysicalRel {
+
+ private final RelOptTable searchTable;
+ private final @Nullable RexProgram calcProgram;
+ private final VectorSearchSpec vectorSearchSpec;
+ private final RelDataType outputRowType;
+
+ public StreamPhysicalVectorSearchTableFunction(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode input,
+ RelOptTable searchTable,
+ @Nullable RexProgram calcProgram,
+ VectorSearchSpec vectorSearchSpec,
+ RelDataType outputRowType) {
+ super(cluster, traits, input);
+ this.searchTable = searchTable;
+ this.calcProgram = calcProgram;
+ this.vectorSearchSpec = vectorSearchSpec;
+ this.outputRowType = outputRowType;
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new StreamPhysicalVectorSearchTableFunction(
+ getCluster(),
+ traitSet,
+ inputs.get(0),
+ searchTable,
+ calcProgram,
+ vectorSearchSpec,
+ outputRowType);
+ }
+
+ @Override
+ protected RelDataType deriveRowType() {
+ return outputRowType;
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ List<String> columnToSearch =
+ vectorSearchSpec.getSearchColumns().keySet().stream()
+ .map(
+ calcProgram == null
+ ?
searchTable.getRowType().getFieldNames()::get
+ :
calcProgram.getOutputRowType().getFieldNames()::get)
+ .collect(Collectors.toList());
+ List<String> columnToQuery =
+ vectorSearchSpec.getSearchColumns().values().stream()
+ .map(this::explainQueryColumnParam)
+ .collect(Collectors.toList());
+
+ Integer topK =
+ ((FunctionCallUtil.Constant) vectorSearchSpec.getTopK())
+ .literal.getValueAs(Integer.class);
+
+ String leftSelect = String.join(", ",
getInput(0).getRowType().getFieldNames());
+ String rightSelect =
+ calcProgram == null
+ ? String.join(", ",
searchTable.getRowType().getFieldNames())
+ : RelExplainUtil.selectionToString(
+ calcProgram,
+ this::getExpressionString,
+ RelExplainUtil.preferExpressionFormat(pw),
+
convertToExpressionDetail(pw.getDetailLevel()));
+
+ return super.explainTerms(pw)
+ .item(
+ "table",
+ ((TableSourceTable) searchTable)
+ .contextResolvedTable()
+ .getIdentifier()
+ .asSummaryString())
+ .item("joinType",
JoinTypeUtil.getFlinkJoinType(vectorSearchSpec.getJoinType()))
+ .item("columnToSearch", String.join(", ", columnToSearch))
+ .item("columnToQuery", String.join(", ", columnToQuery))
+ .item("topK", topK)
+ .item("select", String.join(", ", leftSelect, rightSelect,
"score"));
+ }
+
+ @Override
+ public boolean requireWatermark() {
+ return false;
+ }
+
+ @Override
+ public ExecNode<?> translateToExecNode() {
+ throw new UnsupportedOperationException("Vector search not supported
yet.");
+ }
+
+ private String explainQueryColumnParam(FunctionCallUtil.FunctionParam
param) {
+ if (param instanceof FunctionCallUtil.FieldRef) {
+ int index = ((FunctionCallUtil.FieldRef) param).index;
+ return getInput(0).getRowType().getFieldNames().get(index);
+ } else if (param instanceof FunctionCallUtil.Constant) {
+ return ((FunctionCallUtil.Constant) param).literal.toString();
+ }
+ return null;
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalVectorSearchTableFunctionRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalVectorSearchTableFunctionRule.java
new file mode 100644
index 00000000000..ead37a92a6d
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalVectorSearchTableFunctionRule.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import
org.apache.flink.table.planner.functions.sql.ml.SqlVectorSearchTableFunction;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.VectorSearchSpec;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
+import
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalVectorSearchTableFunction;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.util.Util;
+import org.immutables.value.Value;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rule to convert a {@link FlinkLogicalCorrelate} with VECTOR_SEARCH call
into a {@link
+ * StreamPhysicalVectorSearchTableFunction}.
+ */
[email protected]
+public class StreamPhysicalVectorSearchTableFunctionRule
+ extends RelRule<StreamPhysicalVectorSearchTableFunctionRule.Config> {
+
+ public static final StreamPhysicalVectorSearchTableFunctionRule INSTANCE =
+ Config.DEFAULT.toRule();
+
+ protected StreamPhysicalVectorSearchTableFunctionRule(Config config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ FlinkLogicalTableFunctionScan scan = call.rel(2);
+ RexNode rexNode = scan.getCall();
+ if (!(rexNode instanceof RexCall)) {
+ return false;
+ }
+ RexCall rexCall = (RexCall) rexNode;
+ return rexCall.getOperator() instanceof SqlVectorSearchTableFunction;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ // QUERY_TABLE
+ RelNode input = call.rel(1);
+ final RelNode newInput = RelOptRule.convert(input,
FlinkConventions.STREAM_PHYSICAL());
+
+ // SEARCH_TABLE
+ FlinkLogicalCorrelate correlate = call.rel(0);
+ FlinkLogicalTableFunctionScan vectorSearchCall = call.rel(2);
+ String functionName = ((RexCall)
vectorSearchCall.getCall()).getOperator().getName();
+ SearchTableExtractor extractor = new
SearchTableExtractor(functionName);
+ extractor.visit(vectorSearchCall.getInput(0));
+
+ call.transformTo(
+ new StreamPhysicalVectorSearchTableFunction(
+ correlate.getCluster(),
+
correlate.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL()),
+ newInput,
+ extractor.searchTable,
+ extractor.calcProgram == null
+ ? null
+ : pullUpRexProgram(
+ input.getCluster(),
+ extractor.searchTable.getRowType(),
+ vectorSearchCall.getRowType(),
+ correlate
+ .getRowType()
+ .getFieldList()
+ .subList(
+
input.getRowType().getFieldCount(),
+
correlate.getRowType().getFieldCount()),
+ extractor.calcProgram),
+ buildVectorSearchSpec(
+ correlate, vectorSearchCall,
extractor.searchTable, functionName),
+ correlate.getRowType()));
+ }
+
+ private VectorSearchSpec buildVectorSearchSpec(
+ FlinkLogicalCorrelate correlate,
+ FlinkLogicalTableFunctionScan scan,
+ RelOptTable searchTable,
+ String functionName) {
+ JoinRelType joinType = correlate.getJoinType();
+ if (joinType != JoinRelType.INNER && joinType != JoinRelType.LEFT) {
+ throw new TableException(
+ String.format(
+ "%s only supports INNER JOIN and LEFT JOIN, but
get %s JOIN.",
+ functionName, joinType));
+ }
+
+ RexCall functionCall = (RexCall) scan.getCall();
+
+ // COLUMN_TO_SEARCH
+ RexCall descriptorCall = (RexCall) functionCall.getOperands().get(1);
+ RexNode searchColumn = descriptorCall.getOperands().get(0);
+ if (!(searchColumn instanceof RexLiteral)) {
+ throw new TableException(
+ String.format(
+ "%s got an unknown parameter column_to_search in
descriptor: %s.",
+ functionName, searchColumn));
+ }
+ int searchIndex =
+ searchTable
+ .getRowType()
+ .getFieldNames()
+ .indexOf(RexLiteral.stringValue(searchColumn));
+ if (searchIndex == -1) {
+ throw new TableException(
+ String.format(
+ "%s can not find column `%s` in the search_table
%s physical output type. Currently, Flink doesn't support to use computed
column as the search column.",
+ functionName,
+ RexLiteral.stringValue(searchColumn),
+ String.join(".", searchTable.getQualifiedName())));
+ }
+
+ // COLUMN_TO_QUERY
+ FunctionCallUtil.FunctionParam queryColumn =
+ getQueryColumnParam(functionCall.getOperands().get(2),
correlate, functionName);
+
+ Map<Integer, FunctionCallUtil.FunctionParam> searchColumns = new
LinkedHashMap<>();
+ searchColumns.put(searchIndex, queryColumn);
+
+ // TOP_K
+ RexLiteral topK = (RexLiteral) functionCall.getOperands().get(3);
+ FunctionCallUtil.Constant topKParam =
+ new
FunctionCallUtil.Constant(FlinkTypeFactory.toLogicalType(topK.getType()), topK);
+
+ return new VectorSearchSpec(joinType, searchColumns, topKParam);
+ }
+
+ private FunctionCallUtil.FunctionParam getQueryColumnParam(
+ RexNode queryColumn, FlinkLogicalCorrelate correlate, String
functionName) {
+ if (queryColumn instanceof RexFieldAccess) {
+ RexNode refNode = ((RexFieldAccess)
queryColumn).getReferenceExpr();
+ if (refNode instanceof RexFieldAccess) {
+ // nested field unsupported
+ throw new TableException(
+ String.format(
+ "%s does not support nested field in parameter
column_to_query, but get %s.",
+ functionName, queryColumn));
+ } else if
(!(correlate.getCorrelationId().equals(((RexCorrelVariable) refNode).id))) {
+ throw new TableException(
+ String.format(
+ "This is a bug. Planner can not resolve the
correlation in %s. Please file an issue.",
+ functionName));
+ }
+ return new FunctionCallUtil.FieldRef(
+ ((RexFieldAccess) queryColumn).getField().getIndex());
+ } else {
+ throw new TableException(
+ String.format(
+ "Expect function %s's parameter column_to_query is
literal or field reference, but get expression %s. ",
+ functionName, queryColumn));
+ }
+ }
+
+ /**
+ * Pull up the Calc under the VectorSearchCall.
+ *
+ * <p>Note: The vector search operator actually fetch the data and then do
the calculation. So
+ * pull up the calc is to align the behaviour.
+ */
+ private RexProgram pullUpRexProgram(
+ RelOptCluster cluster,
+ RelDataType scanOutputType,
+ RelDataType originFunctionCallType,
+ List<RelDataTypeField> correlateRightOutputTypes,
+ RexProgram originProgram) {
+ RelDataType searchOutputType =
+ cluster.getTypeFactory()
+ .builder()
+ .kind(scanOutputType.getStructKind())
+ .addAll(scanOutputType.getFieldList())
+ .add(Util.last(originFunctionCallType.getFieldList()))
+ .build();
+ RelDataType newOutputType =
+ cluster.getTypeFactory()
+ .builder()
+ .kind(originProgram.getOutputRowType().getStructKind())
+ .addAll(correlateRightOutputTypes)
+ .build();
+ List<RexNode> exprs = new ArrayList<>(originProgram.getExprList());
+ exprs.add(
+ cluster.getRexBuilder()
+ .makeInputRef(searchOutputType,
searchOutputType.getFieldCount() - 1));
+ return RexProgramBuilder.create(
+ cluster.getRexBuilder(),
+ searchOutputType,
+ exprs,
+ originProgram.getProjectList(),
+ originProgram.getCondition(),
+ newOutputType,
+ true,
+ null)
+ .getProgram();
+ }
+
+ @Value.Immutable
+ public interface Config extends RelRule.Config {
+
+ Config DEFAULT =
+
ImmutableStreamPhysicalVectorSearchTableFunctionRule.Config.builder()
+ .build()
+ .withOperandSupplier(
+ b0 ->
+ b0.operand(FlinkLogicalCorrelate.class)
+ .inputs(
+ b1 ->
+
b1.operand(FlinkLogicalRel.class)
+
.anyInputs(),
+ b2 ->
+ b2.operand(
+
FlinkLogicalTableFunctionScan
+
.class)
+
.anyInputs()))
+
.withDescription("StreamPhysicalVectorSearchTableFunctionRule");
+
+ @Override
+ default StreamPhysicalVectorSearchTableFunctionRule toRule() {
+ return new StreamPhysicalVectorSearchTableFunctionRule(this);
+ }
+ }
+
+ /**
+ * A utility class to extract table source and calc program.
+ *
+ * <p>Supported tree structure:
+ *
+ * <pre>{@code
+ * Calc(without filter) —— TableScan
+ * TableScan
+ * }</pre>
+ */
+ static class SearchTableExtractor {
+
+ enum NodeType {
+ CALC,
+ SCAN
+ }
+
+ @Nullable RexProgram calcProgram;
+ TableSourceTable searchTable;
+
+ private final String functionName;
+ private NodeType parentNode;
+
+ SearchTableExtractor(String functionName) {
+ this.functionName = functionName;
+ }
+
+ private void visit(RelNode rel) {
+ if (rel instanceof RelSubset) {
+ rel = ((RelSubset) rel).getBestOrOriginal();
+ }
+
+ NodeType currentNode = transform(rel);
+ switch (currentNode) {
+ case CALC:
+ if (parentNode != null) {
+ throw new RelOptPlanner.CannotPlanException(
+ String.format(
+ "%s assumes calc to be the first node
in parameter search_table, but it has a parent %s.",
+ functionName, parentNode));
+ }
+ calcProgram = ((Calc) rel).getProgram();
+ if (calcProgram.getCondition() != null) {
+ throw new RelOptPlanner.CannotPlanException(
+ String.format(
+ "%s does not support filter on
parameter search_table.",
+ functionName));
+ }
+ break;
+ case SCAN:
+ if (!(((TableScan) rel).getTable() instanceof
TableSourceTable)) {
+ throw new RelOptPlanner.CannotPlanException(
+ "%s does not support search_table of type: "
+ + searchTable.getClass());
+ }
+ searchTable = (TableSourceTable) ((TableScan)
rel).getTable();
+ break;
+ }
+
+ parentNode = currentNode;
+ if (currentNode != NodeType.SCAN) {
+ visit(rel.getInput(0));
+ }
+ }
+
+ private NodeType transform(RelNode node) {
+ NodeType transformed;
+ if (node instanceof Calc) {
+ transformed = NodeType.CALC;
+ } else if (node instanceof TableScan) {
+ transformed = NodeType.SCAN;
+ } else {
+ throw new RelOptPlanner.CannotPlanException(
+ String.format(
+ "%s does not support %s node in parameter
search_table.",
+ functionName,
node.getClass().getSimpleName()));
+ }
+ return transformed;
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FunctionCallUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FunctionCallUtil.java
index 490915ef1c7..962f7cd5e46 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FunctionCallUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FunctionCallUtil.java
@@ -194,7 +194,7 @@ public abstract class FunctionCallUtil {
return t1 != null ? t1 : t2;
}
- protected static AsyncDataStream.OutputMode convert(
+ public static AsyncDataStream.OutputMode convert(
ChangelogMode inputChangelogMode,
ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) {
if (inputChangelogMode.containsOnly(RowKind.INSERT)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 8134ea7d137..237ba7edea1 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -341,15 +341,10 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
createNewNode(rel, children, ModifyKindSetTrait.INSERT_ONLY,
requiredTrait, requester)
- case ml_predict: StreamPhysicalMLPredictTableFunction =>
- // MLPredict supports only support consuming insert-only
- val children = visitChildren(ml_predict,
ModifyKindSetTrait.INSERT_ONLY)
- createNewNode(
- ml_predict,
- children,
- ModifyKindSetTrait.INSERT_ONLY,
- requiredTrait,
- requester)
+ case _: StreamPhysicalMLPredictTableFunction | _:
StreamPhysicalVectorSearchTableFunction =>
+ // MLPredict, VectorSearch supports only support consuming insert-only
+ val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
+ createNewNode(rel, children, ModifyKindSetTrait.INSERT_ONLY,
requiredTrait, requester)
case join: StreamPhysicalJoin =>
// join support all changes in input
@@ -741,7 +736,8 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
case _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin |
_: StreamPhysicalExchange | _: StreamPhysicalExpand |
_: StreamPhysicalMiniBatchAssigner | _:
StreamPhysicalWatermarkAssigner |
- _: StreamPhysicalWindowTableFunction | _:
StreamPhysicalMLPredictTableFunction =>
+ _: StreamPhysicalWindowTableFunction | _:
StreamPhysicalMLPredictTableFunction |
+ _: StreamPhysicalVectorSearchTableFunction =>
// transparent forward requiredTrait to children
visitChildren(rel, requiredUpdateTrait) match {
case None => None
@@ -1130,7 +1126,7 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
_: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
_: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin
|
- _: StreamPhysicalMLPredictTableFunction =>
+ _: StreamPhysicalMLPredictTableFunction | _:
StreamPhysicalVectorSearchTableFunction =>
// if not explicitly supported, all operators require full deletes
if there are updates
val children = rel.getInputs.map {
case child: StreamPhysicalRel =>
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index d1d1f64b9d5..26b98c32a7f 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -488,6 +488,7 @@ object FlinkStreamRuleSets {
StreamPhysicalProcessTableFunctionRule.INSTANCE,
// model TVFs
StreamPhysicalMLPredictTableFunctionRule.INSTANCE,
+ StreamPhysicalVectorSearchTableFunctionRule.INSTANCE,
// join
StreamPhysicalJoinRule.INSTANCE,
StreamPhysicalMultiJoinRule.INSTANCE,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.java
index 5d85e6b88eb..ef949677394 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.java
@@ -19,16 +19,27 @@
package org.apache.flink.table.planner.plan.stream.sql;
import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.factories.TestTimeTravelCatalog;
import
org.apache.flink.table.planner.functions.sql.ml.SqlVectorSearchTableFunction;
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
+import org.apache.flink.table.planner.utils.DateTimeTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.calcite.plan.RelOptPlanner;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.time.ZoneId;
+import java.util.Collections;
+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link SqlVectorSearchTableFunction}. */
@@ -64,6 +75,29 @@ public class VectorSearchTableFunctionTest extends
TableTestBase {
+ ") with (\n"
+ " 'connector' = 'values'\n"
+ ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE VectorTableWithProctime (\n"
+ + " e INT,\n"
+ + " f BIGINT,\n"
+ + " g ARRAY<FLOAT>,\n"
+ + " proctime as PROCTIME()\n"
+ + ") with (\n"
+ + " 'connector' = 'values'\n"
+ + ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE VectorTableWithMetadata(\n"
+ + " e INT,\n"
+ + " f ARRAY<FLOAT> METADATA,\n"
+ + " g ARRAY<FLOAT>,\n"
+ + " h AS e + 1\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'readable-metadata' = 'f:ARRAY<FLOAT>'\n"
+ + ")");
}
@Test
@@ -204,7 +238,7 @@ public class VectorSearchTableFunctionTest extends
TableTestBase {
.satisfies(
FlinkAssertions.anyCauseMatches(
ValidationException.class,
- "Expect parameter topK is integer literal in
VECTOR_SEARCH, but it is 10.0 with type DECIMAL(3, 1) NOT NULL."));
+ "Expect parameter top_k is an INTEGER NOT NULL
literal in VECTOR_SEARCH, but it is 10.0 with type DECIMAL(3, 1) NOT NULL."));
}
@Test
@@ -219,6 +253,193 @@ public class VectorSearchTableFunctionTest extends
TableTestBase {
.satisfies(
FlinkAssertions.anyCauseMatches(
ValidationException.class,
- "Expect parameter topK is integer literal in
VECTOR_SEARCH, but it is QueryTable.a with type INT."));
+ "Expect parameter top_k is an INTEGER NOT NULL
literal in VECTOR_SEARCH, but it is QueryTable.a with type INT."));
+ }
+
+ @Test
+ void testIllegalTopKValue3() {
+ String sql =
+ "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+ + "VECTOR_SEARCH(\n"
+ + " TABLE VectorTable, DESCRIPTOR(`g`),
QueryTable.d, 0"
+ + ")\n"
+ + ")";
+ assertThatThrownBy(() -> util.verifyRelPlan(sql))
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ ValidationException.class,
+ "Parameter top_k must be greater than 0, but
was 0."));
+ }
+
+ @Test
+ void testSearchTableWithCalc() {
+ // calc -> source
+ util.verifyRelPlan(
+ "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+ + "VECTOR_SEARCH(\n"
+ + " TABLE VectorTableWithProctime, DESCRIPTOR(`g`),
QueryTable.d, 10))");
+ }
+
+ @Test
+ void testSearchTableWithProjection() {
+ util.tableEnv()
+ .executeSql(
+ String.format(
+ "CREATE FUNCTION add_one AS '%s'",
+
JavaUserDefinedScalarFunctions.JavaFunc0.class.getName()));
+ util.verifyRelPlan(
+ "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+ + "VECTOR_SEARCH(\n"
+ + " (SELECT add_one(e) as e1, g, proctime FROM
VectorTableWithProctime), DESCRIPTOR(`g`), QueryTable.d, 10))");
+ }
+
+ @Test
+ void testSearchTableWithMetadataTable() {
+ util.verifyRelPlan(
+ "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+ + " VECTOR_SEARCH(\n"
+ + " TABLE VectorTableWithMetadata,\n"
+ + " DESCRIPTOR(`g`),\n"
+ + " QueryTable.d,\n"
+ + " 10"
+ + " )\n"
+ + ")");
+ }
+
+ @Test
+ void testSearchTableWithDescriptorUsingMetadata() {
+ util.verifyRelPlan(
+ "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+ + " VECTOR_SEARCH(\n"
+ + " TABLE VectorTableWithMetadata,\n"
+ + " DESCRIPTOR(`f`),\n"
+ + " QueryTable.d,\n"
+ + " 10"
+ + " )\n"
+ + ")");
+ }
+
+ @Test
+ void testSearchTableUsingUDFComputedColumn() {
+ util.tableEnv()
+ .executeSql(
+ String.format("CREATE FUNCTION udf AS '%s'",
TestArrayUDF.class.getName()));
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE VectorTableWithComputedColumn (\n"
+ + " e INT NOT NULL,\n"
+ + " f BIGINT,\n"
+ + " g ARRAY<FLOAT>,\n"
+ + " h as udf(e)\n"
+ + ") with (\n"
+ + " 'connector' = 'values'\n"
+ + ")");
+ assertThatThrownBy(
+ () ->
+ util.verifyRelPlan(
+ "SELECT * FROM QueryTable, LATERAL
TABLE(\n"
+ + " VECTOR_SEARCH(\n"
+ + " TABLE
VectorTableWithComputedColumn,\n"
+ + " DESCRIPTOR(`h`),\n"
+ + " QueryTable.d,\n"
+ + " 10"
+ + " )\n"
+ + ")"))
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ TableException.class,
+ "VECTOR_SEARCH can not find column `h` in the
search_table default_catalog.default_database.VectorTableWithComputedColumn
physical output type. "
+ + "Currently, Flink doesn't support to
use computed column as the search column."));
+ }
+
+ @Test
+ void testSearchTableWithSnapshot() throws Exception {
+ String catalogName = "ttc";
+ TestTimeTravelCatalog catalog = new TestTimeTravelCatalog(catalogName);
+ catalog.registerTableForTimeTravel(
+ "t1",
+ Schema.newBuilder()
+ .column("f1", DataTypes.INT())
+ .column("f2", DataTypes.ARRAY(DataTypes.DOUBLE()))
+ .build(),
+ Collections.singletonMap("connector", "values"),
+ DateTimeTestUtil.toEpochMills(
+ "2023-07-31 00:00:00", "yyyy-MM-dd HH:mm:ss",
ZoneId.of("UTC")));
+
+ TableEnvironment tEnv = util.tableEnv();
+ tEnv.registerCatalog(catalogName, catalog);
+ tEnv.useCatalog(catalogName);
+ assertThatThrownBy(
+ () ->
+ util.verifyRelPlan(
+ "SELECT * FROM (select *, proctime()
pts from t1) qt, LATERAL TABLE(\n"
+ + "VECTOR_SEARCH(\n"
+ + " (SELECT * FROM t1 FOR
SYSTEM_TIME AS OF qt.pts), DESCRIPTOR(`f2`), qt.f2, 10))"))
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ RelOptPlanner.CannotPlanException.class,
+ "VECTOR_SEARCH does not support
FlinkLogicalSnapshot node in parameter search_table."));
+ }
+
+ @Test
+ void testSearchTableWithFilter() {
+ String sql =
+ "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+ + "VECTOR_SEARCH(\n"
+ + " (SELECT * FROM VectorTable WHERE e > 0),\n"
+ + " DESCRIPTOR(`g`),\n"
+ + " QueryTable.d,\n"
+ + " 10"
+ + ")\n"
+ + ")";
+ assertThatThrownBy(() -> util.verifyRelPlan(sql))
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ RelOptPlanner.CannotPlanException.class,
+ "VECTOR_SEARCH does not support filter on
parameter search_table."));
+ }
+
+ @Test
+ void testSearchTableWithWatermark() {
+ // watermark assigner -> calc -> scan
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE IllegalTable (\n"
+ + " e INT,\n"
+ + " f BIGINT,\n"
+ + " g ARRAY<FLOAT>,\n"
+ + " rowtime TIMESTAMP(3),\n"
+ + " proctime as PROCTIME(),\n"
+ + " WATERMARK FOR rowtime AS rowtime -
INTERVAL '1' SECOND\n"
+ + ") with (\n"
+ + " 'connector' = 'values'\n"
+ + ")");
+ String sql =
+ "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+ + "VECTOR_SEARCH(\n"
+ + " TABLE IllegalTable, DESCRIPTOR(`g`),
QueryTable.d, 10))";
+ assertThatThrownBy(() -> util.verifyRelPlan(sql))
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ RelOptPlanner.CannotPlanException.class,
+ "VECTOR_SEARCH does not support
FlinkLogicalWatermarkAssigner node in parameter search_table."));
+ }
+
+ @Test
+ void testSearchTableNonExistColumn() {
+ String sql =
+ "SELECT * FROM QueryTable, LATERAL TABLE(\n"
+ + "VECTOR_SEARCH(\n"
+ + " TABLE VectorTable, DESCRIPTOR(`z`),
QueryTable.d, 10"
+ + ")\n"
+ + ")";
+ assertThatThrownBy(() -> util.verifyRelPlan(sql))
+ .satisfies(FlinkAssertions.anyCauseMatches("Unknown identifier
'z'"));
+ }
+
+ public static class TestArrayUDF extends ScalarFunction {
+ public Float[] eval(int i) {
+ return new Float[] {(float) i};
+ }
}
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.xml
index 8aca81dc52d..0933534e171 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/VectorSearchTableFunctionTest.xml
@@ -37,7 +37,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5], a0=[
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
a0, score, score0, score1, score2])
-+- Correlate(invocation=[VECTOR_SEARCH(TABLE(#0),
DESCRIPTOR(_UTF-16LE'score'), $cor0.d, 10)],
correlate=[table(VECTOR_SEARCH(TABLE(),DESCRIPTOR('score'),$cor0.d,10))],
select=[a,b,c,d,rowtime,proctime,a0,score,score0,score1,score2],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, FLOAT ARRAY d,
TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, INTEGER
a0, FLOAT ARRAY score, FLOAT ARRAY score0, FLOAT ARRAY score1, DOUBLE score2)],
joinType=[INNER])
++-
VectorSearchTableFunction(table=[default_catalog.default_database.NameConflictTable],
joinType=[InnerJoin], columnToSearch=[score], columnToQuery=[d], topK=[10],
select=[a, b, c, d, rowtime, proctime, a, score, score0, score1, score])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database,
QueryTable]], fields=[a, b, c, d, rowtime])
@@ -69,18 +69,22 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3],
rowtime=[$4], proctime=[$5], e=[$
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
e, f, g, score])
-+- Correlate(invocation=[VECTOR_SEARCH(TABLE(#0), DESCRIPTOR(_UTF-16LE'g'),
$cor0.d, 10)],
correlate=[table(VECTOR_SEARCH(TABLE(),DESCRIPTOR('g'),$cor0.d,10))],
select=[a,b,c,d,rowtime,proctime,e,f,g,score], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, FLOAT ARRAY d, TIMESTAMP(3) *ROWTIME* rowtime,
TIMESTAMP_LTZ(3) *PROCTIME* proctime, INTEGER e, BIGINT f, FLOAT ARRAY g,
DOUBLE score)], joinType=[INNER])
++-
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTable],
joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10],
select=[a, b, c, d, rowtime, proctime, e, f, g, score])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database,
QueryTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
- <TestCase name="testSimple">
+ <TestCase name="testOutOfOrderNamedArgument">
<Resource name="sql">
<![CDATA[SELECT * FROM QueryTable, LATERAL TABLE(
VECTOR_SEARCH(
- TABLE VectorTable, DESCRIPTOR(`g`), QueryTable.d, 10)
+ COLUMN_TO_QUERY => QueryTable.d,
+ COLUMN_TO_SEARCH => DESCRIPTOR(`g`),
+ TOP_K => 10,
+ SEARCH_TABLE => TABLE VectorTable
+ )
)]]>
</Resource>
<Resource name="ast">
@@ -98,22 +102,142 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3],
rowtime=[$4], proctime=[$5], e=[$
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
e, f, g, score])
-+- Correlate(invocation=[VECTOR_SEARCH(TABLE(#0), DESCRIPTOR(_UTF-16LE'g'),
$cor0.d, 10)],
correlate=[table(VECTOR_SEARCH(TABLE(),DESCRIPTOR('g'),$cor0.d,10))],
select=[a,b,c,d,rowtime,proctime,e,f,g,score], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, FLOAT ARRAY d, TIMESTAMP(3) *ROWTIME* rowtime,
TIMESTAMP_LTZ(3) *PROCTIME* proctime, INTEGER e, BIGINT f, FLOAT ARRAY g,
DOUBLE score)], joinType=[INNER])
++-
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTable],
joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10],
select=[a, b, c, d, rowtime, proctime, e, f, g, score])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database,
QueryTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
- <TestCase name="testOutOfOrderNamedArgument">
+ <TestCase name="testSearchTableWithCalc">
<Resource name="sql">
<![CDATA[SELECT * FROM QueryTable, LATERAL TABLE(
VECTOR_SEARCH(
- COLUMN_TO_QUERY => QueryTable.d,
- COLUMN_TO_SEARCH => DESCRIPTOR(`g`),
- TOP_K => 10,
- SEARCH_TABLE => TABLE VectorTable
- )
+ TABLE VectorTableWithProctime, DESCRIPTOR(`g`), QueryTable.d, 10))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
e=[$6], f=[$7], g=[$8], proctime0=[$9], score=[$10])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{3}])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
QueryTable]])
+ +- LogicalTableFunctionScan(invocation=[VECTOR_SEARCH(TABLE(#0),
DESCRIPTOR(_UTF-16LE'g'), $cor0.d, 10)], rowType=[RecordType(INTEGER e, BIGINT
f, FLOAT ARRAY g, TIMESTAMP_LTZ(3) *PROCTIME* proctime, DOUBLE score)])
+ +- LogicalProject(e=[$0], f=[$1], g=[$2], proctime=[$3])
+ +- LogicalProject(e=[$0], f=[$1], g=[$2], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
VectorTableWithProctime]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
e, f, g, PROCTIME_MATERIALIZE(proctime0) AS proctime0, score])
++-
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTableWithProctime],
joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10],
select=[a, b, c, d, rowtime, proctime, e, f, g, PROCTIME() AS proctime0, score])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
QueryTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSearchTableWithDescriptorUsingMetadata">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM QueryTable, LATERAL TABLE(
+ VECTOR_SEARCH(
+ TABLE VectorTableWithMetadata,
+ DESCRIPTOR(`f`),
+ QueryTable.d,
+ 10 )
+)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
e=[$6], f=[$7], g=[$8], h=[$9], score=[$10])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{3}])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
QueryTable]])
+ +- LogicalTableFunctionScan(invocation=[VECTOR_SEARCH(TABLE(#0),
DESCRIPTOR(_UTF-16LE'f'), $cor0.d, 10)], rowType=[RecordType(INTEGER e, FLOAT
ARRAY f, FLOAT ARRAY g, INTEGER h, DOUBLE score)])
+ +- LogicalProject(e=[$0], f=[$1], g=[$2], h=[$3])
+ +- LogicalProject(e=[$0], f=[$2], g=[$1], h=[+($0, 1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
VectorTableWithMetadata, metadata=[f]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
e, f, g, h, score])
++-
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTableWithMetadata],
joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10],
select=[a, b, c, d, rowtime, proctime, e, f, g, +(e, 1) AS h, score])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
QueryTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSearchTableWithProjection">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM QueryTable, LATERAL TABLE(
+VECTOR_SEARCH(
+ (SELECT add_one(e) as e1, g, proctime FROM VectorTableWithProctime),
DESCRIPTOR(`g`), QueryTable.d, 10))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
e1=[$6], g=[$7], proctime0=[$8], score=[$9])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{3}])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
QueryTable]])
+ +- LogicalTableFunctionScan(invocation=[VECTOR_SEARCH(TABLE(#0),
DESCRIPTOR(_UTF-16LE'g'), $cor0.d, 10)], rowType=[RecordType(BIGINT e1, FLOAT
ARRAY g, TIMESTAMP_LTZ(3) *PROCTIME* proctime, DOUBLE score)])
+ +- LogicalProject(e1=[add_one(CAST($0):BIGINT)], g=[$2], proctime=[$3])
+ +- LogicalProject(e=[$0], f=[$1], g=[$2], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
VectorTableWithProctime]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
e1, g, PROCTIME_MATERIALIZE(proctime0) AS proctime0, score])
++-
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTableWithProctime],
joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10],
select=[a, b, c, d, rowtime, proctime, add_one(CAST(e AS BIGINT)) AS e1, g,
PROCTIME() AS proctime0, score])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
QueryTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSearchTableWithMetadataTable">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM QueryTable, LATERAL TABLE(
+ VECTOR_SEARCH(
+ TABLE VectorTableWithMetadata,
+ DESCRIPTOR(`g`),
+ QueryTable.d,
+ 10 )
+)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
e=[$6], f=[$7], g=[$8], h=[$9], score=[$10])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{3}])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
QueryTable]])
+ +- LogicalTableFunctionScan(invocation=[VECTOR_SEARCH(TABLE(#0),
DESCRIPTOR(_UTF-16LE'g'), $cor0.d, 10)], rowType=[RecordType(INTEGER e, FLOAT
ARRAY f, FLOAT ARRAY g, INTEGER h, DOUBLE score)])
+ +- LogicalProject(e=[$0], f=[$1], g=[$2], h=[$3])
+ +- LogicalProject(e=[$0], f=[$2], g=[$1], h=[+($0, 1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
VectorTableWithMetadata, metadata=[f]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
e, f, g, h, score])
++-
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTableWithMetadata],
joinType=[InnerJoin], columnToSearch=[f], columnToQuery=[d], topK=[10],
select=[a, b, c, d, rowtime, proctime, e, f, g, +(e, 1) AS h, score])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
QueryTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSimple">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM QueryTable, LATERAL TABLE(
+VECTOR_SEARCH(
+ TABLE VectorTable, DESCRIPTOR(`g`), QueryTable.d, 10)
)]]>
</Resource>
<Resource name="ast">
@@ -131,7 +255,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3],
rowtime=[$4], proctime=[$5], e=[$
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
e, f, g, score])
-+- Correlate(invocation=[VECTOR_SEARCH(TABLE(#0), DESCRIPTOR(_UTF-16LE'g'),
$cor0.d, 10)],
correlate=[table(VECTOR_SEARCH(TABLE(),DESCRIPTOR('g'),$cor0.d,10))],
select=[a,b,c,d,rowtime,proctime,e,f,g,score], rowType=[RecordType(INTEGER a,
BIGINT b, VARCHAR(2147483647) c, FLOAT ARRAY d, TIMESTAMP(3) *ROWTIME* rowtime,
TIMESTAMP_LTZ(3) *PROCTIME* proctime, INTEGER e, BIGINT f, FLOAT ARRAY g,
DOUBLE score)], joinType=[INNER])
++-
VectorSearchTableFunction(table=[default_catalog.default_database.VectorTable],
joinType=[InnerJoin], columnToSearch=[g], columnToQuery=[d], topK=[10],
select=[a, b, c, d, rowtime, proctime, e, f, g, score])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database,
QueryTable]], fields=[a, b, c, d, rowtime])