This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2df2286ccd2 [FLINK-37791][table] Support to convert ML_PREDICT to
physical node(#26603)
2df2286ccd2 is described below
commit 2df2286ccd275ba351f2aa8158ac8f756206daeb
Author: Hao Li <[email protected]>
AuthorDate: Tue Jun 3 23:15:30 2025 -0700
[FLINK-37791][table] Support to convert ML_PREDICT to physical node(#26603)
---
.../flink/table/factories/FactoryUtilTest.java | 1 +
.../table/factories/TestModelProviderFactory.java | 11 +-
...ava => TestNonPredictModelProviderFactory.java} | 4 +-
.../org.apache.flink.table.factories.Factory | 1 +
.../java/org/apache/calcite/sql/SqlModelCall.java | 4 +
.../planner/calcite/FlinkCalciteSqlValidator.java | 4 +
.../planner/calcite/FlinkConvertletTable.java | 8 +
.../flink/table/planner/calcite/RexModelCall.java | 67 ++
.../table/planner/catalog/CatalogSchemaModel.java | 54 ++
.../StreamPhysicalMLPredictTableFunction.java | 77 ++
.../StreamPhysicalMLPredictTableFunctionRule.java | 84 ++
.../FlinkChangelogModeInferenceProgram.scala | 7 +-
.../planner/plan/rules/FlinkStreamRuleSets.scala | 4 +-
.../stream/sql/MLPredictTableFunctionTest.java | 92 +-
.../plan/stream/sql/MLPredictTableFunctionTest.xml | 944 +++++++++++++++++++++
15 files changed, 1343 insertions(+), 19 deletions(-)
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
index 5f7bfe61797..7b8237f6f84 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -760,6 +760,7 @@ class FactoryUtilTest {
+ "' in the classpath.\n\n"
+ "Available factory identifiers are:\n\n"
+ "conflicting-model\n"
+ + "non-predict-model\n"
+ "test-model");
}
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
index 725e25df24a..b4a2da061e8 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
@@ -22,12 +22,14 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.table.catalog.ResolvedCatalogModel;
import org.apache.flink.table.factories.FactoryUtil.ModelProviderFactoryHelper;
+import org.apache.flink.table.functions.PredictFunction;
import org.apache.flink.table.ml.ModelProvider;
+import org.apache.flink.table.ml.PredictRuntimeProvider;
import java.util.HashSet;
import java.util.Set;
-/** Test implementation for {@link ModelProviderFactory}. */
+/** Test implementation for {@link ModelProviderFactory} which creates
PredictRuntimeProvider. */
public final class TestModelProviderFactory implements ModelProviderFactory {
public static final String IDENTIFIER = "test-model";
@@ -79,7 +81,7 @@ public final class TestModelProviderFactory implements
ModelProviderFactory {
}
/** Test implementation of {@link ModelProvider} for testing purposes. */
- public static class TestModelProviderMock implements ModelProvider {
+ public static class TestModelProviderMock implements
PredictRuntimeProvider {
private final ResolvedCatalogModel catalogModel;
@@ -102,5 +104,10 @@ public final class TestModelProviderFactory implements
ModelProviderFactory {
}
return catalogModel.equals(((TestModelProviderMock)
o).catalogModel);
}
+
+ @Override
+ public PredictFunction createPredictFunction(Context context) {
+ throw new UnsupportedOperationException("To be implemented");
+ }
}
}
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestNonPredictModelProviderFactory.java
similarity index 95%
copy from
flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
copy to
flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestNonPredictModelProviderFactory.java
index 725e25df24a..d975f0a264e 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestNonPredictModelProviderFactory.java
@@ -28,9 +28,9 @@ import java.util.HashSet;
import java.util.Set;
/** Test implementation for {@link ModelProviderFactory}. */
-public final class TestModelProviderFactory implements ModelProviderFactory {
+public final class TestNonPredictModelProviderFactory implements
ModelProviderFactory {
- public static final String IDENTIFIER = "test-model";
+ public static final String IDENTIFIER = "non-predict-model";
public static final ConfigOption<String> TASK =
ConfigOptions.key("task")
diff --git
a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index bf2382c7310..3053d0f7814 100644
---
a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -24,6 +24,7 @@
org.apache.flink.table.factories.TestConflictingDynamicTableFactory2
org.apache.flink.table.factories.TestCatalogFactory
org.apache.flink.table.factories.TestCatalogStoreFactory
org.apache.flink.table.factories.TestModelProviderFactory
+org.apache.flink.table.factories.TestNonPredictModelProviderFactory
org.apache.flink.table.factories.TestConflictingModelProviderFactory1
org.apache.flink.table.factories.TestConflictingModelProviderFactory2
org.apache.flink.table.factories.module.DummyModuleFactory
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/SqlModelCall.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/SqlModelCall.java
index dc151792535..d0f160804c5 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/SqlModelCall.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/SqlModelCall.java
@@ -53,6 +53,10 @@ public class SqlModelCall extends SqlBasicCall {
return model.getOutputRowType(validator.getTypeFactory());
}
+ public CatalogSchemaModel getModel() {
+ return model;
+ }
+
/**
* A custom SqlOperator to handle model identifier.
*
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index 799cc9277e6..f63b368cbad 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -131,6 +131,10 @@ public final class FlinkCalciteSqlValidator extends
SqlValidatorImpl {
.get(TableConfigOptions.TABLE_COLUMN_EXPANSION_STRATEGY);
}
+ public RelOptCluster getRelOptCluster() {
+ return relOptCluster;
+ }
+
public void setExpectedOutputType(SqlNode sqlNode, RelDataType
expectedOutputType) {
this.sqlNodeForExpectedOutputType = sqlNode;
this.expectedOutputType = expectedOutputType;
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
index c03c858efd6..569704f1d87 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlModelCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
@@ -65,11 +66,18 @@ public class FlinkConvertletTable implements
SqlRexConvertletTable {
return this::convertTryCast;
} else if (operator instanceof SqlTableFunction) {
return this::convertTableArgs;
+ } else if (call instanceof SqlModelCall) {
+ return this::convertModelCall;
}
return StandardConvertletTable.INSTANCE.get(call);
}
+ private RexNode convertModelCall(SqlRexContext cx, final SqlCall call) {
+ SqlModelCall modelCall = (SqlModelCall) call;
+ return modelCall.getModel().toRex(cx);
+ }
+
// Slightly modified version of StandardConvertletTable::convertCast
private RexNode convertTryCast(SqlRexContext cx, final SqlCall call) {
RelDataTypeFactory typeFactory = cx.getTypeFactory();
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexModelCall.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexModelCall.java
new file mode 100644
index 00000000000..2e71d217f70
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexModelCall.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.flink.table.catalog.ContextResolvedModel;
+import org.apache.flink.table.ml.ModelProvider;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlSpecialOperator;
+
+import java.util.List;
+
+/**
+ * A {@link RexCall} that represents a call to a model provider model.
+ *
+ * <p>This is used to represent calls to models in the Flink SQL planner.
+ */
+public class RexModelCall extends RexCall {
+
+ private final ModelProvider modelProvider;
+ private final ContextResolvedModel contextResolvedModel;
+
+ public RexModelCall(
+ RelDataType outputType,
+ ContextResolvedModel contextResolvedModel,
+ ModelProvider modelProvider) {
+ super(outputType, new SqlSpecialOperator("Model", SqlKind.OTHER),
List.of());
+ this.contextResolvedModel = contextResolvedModel;
+ this.modelProvider = modelProvider;
+ }
+
+ public ModelProvider getModelProvider() {
+ return modelProvider;
+ }
+
+ @Override
+ protected String computeDigest(boolean withType) {
+ final StringBuilder sb = new StringBuilder(op.getName());
+ sb.append("(");
+ sb.append("MODEL ")
+ .append(contextResolvedModel.getIdentifier().asSummaryString())
+ .append(")");
+ if (withType) {
+ sb.append(":");
+ sb.append(type.getFullTypeString());
+ }
+ return sb.toString();
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaModel.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaModel.java
index c26f3a16224..945e674ed6f 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaModel.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaModel.java
@@ -18,19 +18,34 @@
package org.apache.flink.table.planner.catalog;
+import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ContextResolvedModel;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.ModelProviderFactory;
+import org.apache.flink.table.ml.ModelProvider;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
+import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.RexModelCall;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.types.PlannerTypeUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql2rel.SqlRexContext;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.flink.util.OptionalUtils.firstPresent;
+
/**
* Represents a wrapper for {@link ContextResolvedModel} in {@link
* org.apache.calcite.schema.Schema}.
@@ -75,6 +90,15 @@ public class CatalogSchemaModel {
return schemaToRelDataType(flinkTypeFactory, schema);
}
+ public RexNode toRex(SqlRexContext rexContext) {
+ FlinkCalciteSqlValidator validator = (FlinkCalciteSqlValidator)
rexContext.getValidator();
+ RelOptCluster cluster = validator.getRelOptCluster();
+ FlinkContext context = ShortcutUtils.unwrapContext(cluster);
+ ModelProvider modelProvider = createModelProvider(context,
contextResolvedModel);
+ return new RexModelCall(
+ getInputRowType(validator.getTypeFactory()),
contextResolvedModel, modelProvider);
+ }
+
private static RelDataType schemaToRelDataType(
FlinkTypeFactory typeFactory, ResolvedSchema schema) {
final List<String> fieldNames = schema.getColumnNames();
@@ -85,4 +109,34 @@ public class CatalogSchemaModel {
.collect(Collectors.toList());
return typeFactory.buildRelNodeRowType(fieldNames, fieldTypes);
}
+
+ private ModelProvider createModelProvider(
+ FlinkContext context, ContextResolvedModel catalogModel) {
+
+ final Optional<ModelProviderFactory> factoryFromCatalog =
+ catalogModel
+ .getCatalog()
+ .flatMap(Catalog::getFactory)
+ .map(
+ f ->
+ f instanceof ModelProviderFactory
+ ? (ModelProviderFactory) f
+ : null);
+
+ final Optional<ModelProviderFactory> factoryFromModule =
+
context.getModuleManager().getFactory(Module::getModelProviderFactory);
+
+ // Since the catalog is more specific, we give it precedence over a
factory provided by any
+ // modules.
+ final ModelProviderFactory factory =
+ firstPresent(factoryFromCatalog,
factoryFromModule).orElse(null);
+
+ return FactoryUtil.createModelProvider(
+ factory,
+ contextResolvedModel.getIdentifier(),
+ contextResolvedModel.getResolvedModel(),
+ context.getTableConfig(),
+ context.getClassLoader(),
+ contextResolvedModel.isTemporary());
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunction.java
new file mode 100644
index 00000000000..faaf96f5dfe
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream;
+
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.type.RelDataType;
+
+import java.util.List;
+
+/** Stream physical RelNode for ml predict table function. */
+public class StreamPhysicalMLPredictTableFunction extends SingleRel implements
StreamPhysicalRel {
+
+ private final RelDataType outputRowType;
+ private final FlinkLogicalTableFunctionScan scan;
+
+ public StreamPhysicalMLPredictTableFunction(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode inputRel,
+ FlinkLogicalTableFunctionScan scan,
+ RelDataType outputRowType) {
+ super(cluster, traits, inputRel);
+ this.scan = scan;
+ this.outputRowType = outputRowType;
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new StreamPhysicalMLPredictTableFunction(
+ getCluster(), traitSet, inputs.get(0), scan, getRowType());
+ }
+
+ @Override
+ public boolean requireWatermark() {
+ return false;
+ }
+
+ @Override
+ public ExecNode<?> translateToExecNode() {
+ return null;
+ }
+
+ @Override
+ protected RelDataType deriveRowType() {
+ return outputRowType;
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw)
+ .item("invocation", scan.getCall())
+ .item("rowType", getRowType());
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunctionRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunctionRule.java
new file mode 100644
index 00000000000..640c01e495d
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunctionRule.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream;
+
+import org.apache.flink.table.ml.AsyncPredictRuntimeProvider;
+import org.apache.flink.table.ml.PredictRuntimeProvider;
+import org.apache.flink.table.planner.calcite.RexModelCall;
+import
org.apache.flink.table.planner.functions.sql.ml.SqlMLPredictTableFunction;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexCall;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Rule to convert a {@link FlinkLogicalTableFunctionScan} with ml_predict
call into a {@link
+ * StreamPhysicalMLPredictTableFunction}.
+ */
+public class StreamPhysicalMLPredictTableFunctionRule extends ConverterRule {
+
+ public static final StreamPhysicalMLPredictTableFunctionRule INSTANCE =
+ new StreamPhysicalMLPredictTableFunctionRule(
+ Config.INSTANCE.withConversion(
+ FlinkLogicalTableFunctionScan.class,
+ FlinkConventions.LOGICAL(),
+ FlinkConventions.STREAM_PHYSICAL(),
+ "StreamPhysicalModelTableFunctionRule"));
+
+ private StreamPhysicalMLPredictTableFunctionRule(Config config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final FlinkLogicalTableFunctionScan scan = call.rel(0);
+ final RexCall rexCall = (RexCall) scan.getCall();
+ if (!(rexCall.getOperator() instanceof SqlMLPredictTableFunction)) {
+ return false;
+ }
+
+ final RexModelCall modelCall = (RexModelCall)
rexCall.getOperands().get(1);
+ return modelCall.getModelProvider() instanceof PredictRuntimeProvider
+ || modelCall.getModelProvider() instanceof
AsyncPredictRuntimeProvider;
+ }
+
+ @Override
+ public @Nullable RelNode convert(RelNode rel) {
+ final FlinkLogicalTableFunctionScan scan =
(FlinkLogicalTableFunctionScan) rel;
+ final RelNode newInput =
+ RelOptRule.convert(scan.getInput(0),
FlinkConventions.STREAM_PHYSICAL());
+
+ final RelTraitSet providedTraitSet =
+ rel.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
+
+ // TODO:
+ // Get model provider and context resolved model from RexModelCall
+ // Get table input from descriptor
+ // Get config from map
+ // Create ModelProviderSpec similar to DynamicTableSourceSpec and
TemporalTableSourceSpec
+ return new StreamPhysicalMLPredictTableFunction(
+ scan.getCluster(), providedTraitSet, newInput, scan,
scan.getRowType());
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index e84edf8ad2e..9e6bcd119ee 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -376,7 +376,7 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
case _: StreamPhysicalCalcBase | _: StreamPhysicalCorrelateBase |
_: StreamPhysicalLookupJoin | _: StreamPhysicalExchange | _:
StreamPhysicalExpand |
_: StreamPhysicalMiniBatchAssigner | _:
StreamPhysicalWatermarkAssigner |
- _: StreamPhysicalWindowTableFunction =>
+ _: StreamPhysicalWindowTableFunction | _:
StreamPhysicalMLPredictTableFunction =>
// transparent forward requiredTrait to children
val children = visitChildren(rel, requiredTrait, requester)
val childrenTrait =
children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
@@ -716,7 +716,7 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
case _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin |
_: StreamPhysicalExchange | _: StreamPhysicalExpand |
_: StreamPhysicalMiniBatchAssigner | _:
StreamPhysicalWatermarkAssigner |
- _: StreamPhysicalWindowTableFunction =>
+ _: StreamPhysicalWindowTableFunction | _:
StreamPhysicalMLPredictTableFunction =>
// transparent forward requiredTrait to children
visitChildren(rel, requiredUpdateTrait) match {
case None => None
@@ -1083,7 +1083,8 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
_: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
_: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
_: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
- _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin
=>
+ _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin
|
+ _: StreamPhysicalMLPredictTableFunction =>
// if not explicitly supported, all operators require full deletes
if there are updates
val children = rel.getInputs.map {
case child: StreamPhysicalRel =>
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 74219150ed0..f24433deb4b 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.planner.plan.rules
import org.apache.flink.table.planner.plan.nodes.logical._
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalProcessTableFunctionRule
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalMLPredictTableFunctionRule,
StreamPhysicalProcessTableFunctionRule}
import org.apache.flink.table.planner.plan.rules.logical._
import
org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule
import org.apache.flink.table.planner.plan.rules.physical.stream._
@@ -470,6 +470,8 @@ object FlinkStreamRuleSets {
StreamPhysicalWindowDeduplicateRule.INSTANCE,
// process table function
StreamPhysicalProcessTableFunctionRule.INSTANCE,
+ // model TVFs
+ StreamPhysicalMLPredictTableFunctionRule.INSTANCE,
// join
StreamPhysicalJoinRule.INSTANCE,
StreamPhysicalIntervalJoinRule.INSTANCE,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
index 20bf939a807..9a786d9d009 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.stream.sql;
import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.planner.utils.TableTestUtil;
@@ -64,7 +65,11 @@ public class MLPredictTableFunctionTest extends
TableTestBase {
+ "INPUT (a INT, b BIGINT)\n"
+ "OUTPUT(e STRING, f ARRAY<INT>)\n"
+ "with (\n"
- + " 'provider' = 'openai'\n"
+ + " 'provider' = 'test-model',\n" // test
model provider defined in
+ // TestModelProviderFactory in
+ // flink-table-common
+ + " 'endpoint' = 'someendpoint',\n"
+ + " 'task' = 'text_generation'\n"
+ ")");
}
@@ -75,7 +80,7 @@ public class MLPredictTableFunctionTest extends TableTestBase
{
+ "FROM TABLE(ML_PREDICT(INPUT => TABLE MyTable, "
+ "MODEL => MODEL MyModel, "
+ "ARGS => DESCRIPTOR(a, b)))";
- assertReachesRelConverter(sql);
+ util.verifyRelPlan(sql);
}
@Test
@@ -86,7 +91,7 @@ public class MLPredictTableFunctionTest extends TableTestBase
{
+ "MODEL => MODEL MyModel, "
+ "ARGS => DESCRIPTOR(a, b),"
+ "CONFIG => MAP['key', 'value']))";
- assertReachesRelConverter(sql);
+ util.verifyRelPlan(sql);
}
@Test
@@ -94,7 +99,7 @@ public class MLPredictTableFunctionTest extends TableTestBase
{
String sql =
"SELECT *\n"
+ "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL MyModel,
DESCRIPTOR(a, b)))";
- assertReachesRelConverter(sql);
+ util.verifyRelPlan(sql);
}
@Test
@@ -103,7 +108,7 @@ public class MLPredictTableFunctionTest extends
TableTestBase {
String sql =
"SELECT *\n"
+ "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL MyModel,
DESCRIPTOR(a, b), MAP['async', 'true', 'timeout', '100s']))";
- assertReachesRelConverter(sql);
+ util.verifyRelPlan(sql);
}
@Test
@@ -142,13 +147,15 @@ public class MLPredictTableFunctionTest extends
TableTestBase {
+ "INPUT (a INT, b BIGINT)\n"
+ "OUTPUT(c STRING, d ARRAY<INT>)\n"
+ "with (\n"
- + " 'provider' = 'openai'\n"
+ + " 'task' = 'text_generation',\n"
+ + " 'endpoint' = 'someendpoint',\n"
+ + " 'provider' = 'test-model'"
+ ")");
String sql =
"SELECT *\n"
+ "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL
ConflictModel, DESCRIPTOR(a, b)))";
- assertReachesRelConverter(sql);
+ util.verifyRelPlan(sql);
}
@Test
@@ -215,14 +222,16 @@ public class MLPredictTableFunctionTest extends
TableTestBase {
+ "INPUT (x %s)\n"
+ "OUTPUT (res STRING)\n"
+ "with (\n"
- + " 'provider' = 'openai'\n"
+ + " 'task' = 'text_generation',\n"
+ + " 'endpoint' = 'someendpoint',\n"
+ + " 'provider' = 'test-model'"
+ ")",
modelType));
String sql =
"SELECT *\n"
+ "FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL
TypeModel, DESCRIPTOR(col)))";
- assertReachesRelConverter(sql);
+ util.verifyRelPlan(sql);
}
@ParameterizedTest
@@ -271,9 +280,70 @@ public class MLPredictTableFunctionTest extends
TableTestBase {
"ML_PREDICT config param can only be a MAP of string
literals. The item at position 1 is TRUE.");
}
- private void assertReachesRelConverter(String sql) {
+ @Test
+ public void testNonExistProvider() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE MODEL ConflictModel\n"
+ + "INPUT (a INT, b BIGINT)\n"
+ + "OUTPUT(c STRING, d ARRAY<INT>)\n"
+ + "with (\n"
+ + " 'task' = 'text_generation',\n"
+ + " 'endpoint' = 'someendpoint',\n"
+ + " 'provider' = 'non-exist-model'"
+ + ")");
+
+ String sql =
+ "SELECT *\n"
+ + "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL
ConflictModel, DESCRIPTOR(a, b)))";
+ assertThatThrownBy(() -> util.verifyRelPlan(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Unable to create a model provider for model
'default_catalog.default_database.ConflictModel'.");
+ }
+
+ @Test
+ public void testNonPredictProvider() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE MODEL ConflictModel\n"
+ + "INPUT (a INT, b BIGINT)\n"
+ + "OUTPUT(c STRING, d ARRAY<INT>)\n"
+ + "with (\n"
+ + " 'task' = 'text_generation',\n"
+ + " 'endpoint' = 'someendpoint',\n"
+ + " 'provider' = 'non-exist-model'"
+ + ")");
+
+ String sql =
+ "SELECT *\n"
+ + "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL
ConflictModel, DESCRIPTOR(a, b)))";
+ assertThatThrownBy(() -> util.verifyRelPlan(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Unable to create a model provider for model
'default_catalog.default_database.ConflictModel'.");
+ }
+
+ @Test
+ public void testNotMLPredictRuntimeProvider() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE MODEL ConflictModel\n"
+ + "INPUT (a INT, b BIGINT)\n"
+ + "OUTPUT(c STRING, d ARRAY<INT>)\n"
+ + "with (\n"
+ + " 'task' = 'text_generation',\n"
+ + " 'endpoint' = 'someendpoint',\n"
+ + " 'provider' = 'non-predict-model'"
+ + ")");
+
+ String sql =
+ "SELECT *\n"
+ + "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL
ConflictModel, DESCRIPTOR(a, b)))";
assertThatThrownBy(() -> util.verifyRelPlan(sql))
- .hasMessageContaining("while converting MODEL");
+ .isInstanceOf(TableException.class)
+ .hasMessageContaining(
+ "This exception indicates that the query uses an
unsupported SQL feature.");
}
private static Stream<Arguments> compatibleTypeProvider() {
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
new file mode 100644
index 00000000000..9ee6c7e384d
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
@@ -0,0 +1,944 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testCompatibleInputTypes[[10] STRING, STRING]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(VARCHAR(2147483647) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(VARCHAR(2147483647) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[11] BINARY(10), BINARY(10)]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(BINARY(10) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(BINARY(10) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[12] VARBINARY(10), VARBINARY(10)]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(VARBINARY(10) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(VARBINARY(10) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[13] DATE, DATE]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DATE col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DATE col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[14] TIME(3), TIME(3)]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[15] TIMESTAMP(3), TIMESTAMP(3)]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TIMESTAMP(3) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TIMESTAMP(3) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[16] TIMESTAMP_LTZ(3),
TIMESTAMP_LTZ(3)]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) col, VARCHAR(2147483647)
res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) col, VARCHAR(2147483647)
res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[17] TINYINT, SMALLINT]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TINYINT col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TINYINT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[18] SMALLINT, INT]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(SMALLINT col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(SMALLINT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[19] INT, BIGINT]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(INTEGER col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(INTEGER col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[1] STRING NOT NULL, STRING]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(VARCHAR(2147483647) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(VARCHAR(2147483647) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[20] FLOAT, DOUBLE]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(FLOAT col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(FLOAT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[21] DECIMAL(5,2), DECIMAL(10,2)]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DECIMAL(5, 2) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DECIMAL(5, 2) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[22] DECIMAL(10,2), DECIMAL(5,2)]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DECIMAL(10, 2) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DECIMAL(10, 2) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[23] CHAR(10), STRING]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(CHAR(10) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(CHAR(10) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[24] VARCHAR(20), STRING]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(VARCHAR(20) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(VARCHAR(20) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[25] TIMESTAMP(3), TIMESTAMP(3)]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TIMESTAMP(3) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TIMESTAMP(3) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[26] DATE, DATE]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DATE col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DATE col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[27] TIME(3), TIME(3)]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[28] ARRAY<INT>,
ARRAY<INT>]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(INTEGER ARRAY col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(INTEGER ARRAY col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[29] ARRAY<TINYINT>,
ARRAY<SMALLINT>]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TINYINT ARRAY col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TINYINT ARRAY col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[2] BOOLEAN, BOOLEAN]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(BOOLEAN col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(BOOLEAN col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[30] ARRAY<DECIMAL(5,2)>,
ARRAY<DECIMAL(10,2)>]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DECIMAL(5, 2) ARRAY col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DECIMAL(5, 2) ARRAY col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[31] ARRAY<VARCHAR(20)>,
ARRAY<STRING>]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(VARCHAR(20) ARRAY col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(VARCHAR(20) ARRAY col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[32] MAP<STRING, INT>,
MAP<STRING, INT>]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType((VARCHAR(2147483647), INTEGER) MAP col, VARCHAR(2147483647)
res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType((VARCHAR(2147483647), INTEGER) MAP col, VARCHAR(2147483647)
res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[33] MAP<STRING,
DECIMAL(5,2)>, MAP<STRING, DECIMAL(10,2)>]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType((VARCHAR(2147483647), DECIMAL(5, 2)) MAP col,
VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType((VARCHAR(2147483647), DECIMAL(5, 2)) MAP col,
VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[34] MAP<VARCHAR(20),
ARRAY<INT>>, MAP<STRING, ARRAY<INT>>]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType((VARCHAR(20), INTEGER ARRAY) MAP col, VARCHAR(2147483647)
res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType((VARCHAR(20), INTEGER ARRAY) MAP col, VARCHAR(2147483647)
res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[35] ROW<a INT, b STRING>,
ROW<a INT, b STRING>]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(RecordType:peek_no_expand(INTEGER a, VARCHAR(2147483647) b)
col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(RecordType:peek_no_expand(INTEGER a, VARCHAR(2147483647) b)
col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[36] ROW<a INT, b STRING>,
ROW<x INT, y STRING>]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(RecordType:peek_no_expand(INTEGER a, VARCHAR(2147483647) b)
col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(RecordType:peek_no_expand(INTEGER a, VARCHAR(2147483647) b)
col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[37] ROW<a DECIMAL(5,2), b
ARRAY<INT>>, ROW<a DECIMAL(10,2), b ARRAY<INT>>]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(RecordType:peek_no_expand(DECIMAL(5, 2) a, INTEGER ARRAY b)
col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(RecordType:peek_no_expand(DECIMAL(5, 2) a, INTEGER ARRAY b)
col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[38] ROW<a VARCHAR(20), b
MAP<STRING, INT>>, ROW<a STRING, b MAP<STRING,
INT>>]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(RecordType:peek_no_expand(VARCHAR(20) a,
(VARCHAR(2147483647), INTEGER) MAP b) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(RecordType:peek_no_expand(VARCHAR(20) a,
(VARCHAR(2147483647), INTEGER) MAP b) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[39] ROW<a ARRAY<INT>,
b MAP<STRING, ARRAY<DECIMAL(5,2)>>>, ROW<a
ARRAY<INT>, b MAP<STRING, ARRAY<DECIMAL(10,2)>>>]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(RecordType:peek_no_expand(INTEGER ARRAY a,
(VARCHAR(2147483647), DECIMAL(5, 2) ARRAY) MAP b) col, VARCHAR(2147483647)
res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(RecordType:peek_no_expand(INTEGER ARRAY a,
(VARCHAR(2147483647), DECIMAL(5, 2) ARRAY) MAP b) col, VARCHAR(2147483647)
res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[3] TINYINT, TINYINT]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TINYINT col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(TINYINT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[40] MAP<STRING, ROW<a INT,
b ARRAY<VARCHAR(20)>>>, MAP<STRING, ROW<a INT, b
ARRAY<STRING>>>]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType((VARCHAR(2147483647), RecordType:peek_no_expand(INTEGER a,
VARCHAR(20) ARRAY b)) MAP col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType((VARCHAR(2147483647), RecordType:peek_no_expand(INTEGER a,
VARCHAR(20) ARRAY b)) MAP col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[4] SMALLINT, SMALLINT]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(SMALLINT col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(SMALLINT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[5] INT, INT]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(INTEGER col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(INTEGER col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[6] BIGINT, BIGINT]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(BIGINT col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(BIGINT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[7] FLOAT, FLOAT]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(FLOAT col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(FLOAT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[8] DOUBLE, DOUBLE]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DOUBLE col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DOUBLE col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompatibleInputTypes[[9] DECIMAL(10,2), DECIMAL(10,2)]">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DECIMAL(10, 2) col, VARCHAR(2147483647) res)])
+ +- LogicalProject(col=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))],
rowType=[RecordType(DECIMAL(10, 2) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNamedArguments">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(INPUT => TABLE MyTable, MODEL => MODEL MyModel, ARGS =>
DESCRIPTOR(a, b)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
e=[$6], f=[$7])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a',
_UTF-16LE'b'))], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647)
c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3)
*PROCTIME* proctime, VARCHAR(2147483647) e, INTEGER ARRAY f)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
e, f])
++- MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a',
_UTF-16LE'b'))], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647)
c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3)
*PROCTIME* proctime, VARCHAR(2147483647) e, INTEGER ARRAY f)])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConfigWithCast">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL MyModel, DESCRIPTOR(a, b),
MAP['async', 'true', 'timeout', '100s']))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
e=[$6], f=[$7])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a',
_UTF-16LE'b'), MAP(_UTF-16LE'async':VARCHAR(7) CHARACTER SET "UTF-16LE",
_UTF-16LE'true', _UTF-16LE'timeout':VARCHAR(7) CHARACTER SET "UTF-16LE",
_UTF-16LE'100s'))], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime,
TIMESTAMP_LTZ(3) *PROCTIME* proctime, VARCHAR(2147483647) e, INTEGER A [...]
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
e, f])
++- MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a',
_UTF-16LE'b'), MAP(_UTF-16LE'async':VARCHAR(7) CHARACTER SET "UTF-16LE",
_UTF-16LE'true', _UTF-16LE'timeout':VARCHAR(7) CHARACTER SET "UTF-16LE",
_UTF-16LE'100s'))], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime,
TIMESTAMP_LTZ(3) *PROCTIME* proctime, VARCHAR(2147483647) e, INTEGER ARRAY f)])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testOptionalNamedArguments">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(INPUT => TABLE MyTable, MODEL => MODEL MyModel, ARGS
=> DESCRIPTOR(a, b),CONFIG => MAP['key', 'value']))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
e=[$6], f=[$7])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a',
_UTF-16LE'b'), MAP(_UTF-16LE'key', _UTF-16LE'value'))],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3)
d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime,
VARCHAR(2147483647) e, INTEGER ARRAY f)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
e, f])
++- MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a',
_UTF-16LE'b'), MAP(_UTF-16LE'key', _UTF-16LE'value'))],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3)
d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime,
VARCHAR(2147483647) e, INTEGER ARRAY f)])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConflictOutputColumnName">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL ConflictModel, DESCRIPTOR(a, b)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
c0=[$6], d0=[$7])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.ConflictModel), DESCRIPTOR(_UTF-16LE'a',
_UTF-16LE'b'))], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647)
c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3)
*PROCTIME* proctime, VARCHAR(2147483647) c0, INTEGER ARRAY d0)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
c0, d0])
++- MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.ConflictModel), DESCRIPTOR(_UTF-16LE'a',
_UTF-16LE'b'))], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647)
c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3)
*PROCTIME* proctime, VARCHAR(2147483647) c0, INTEGER ARRAY d0)])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSimple">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL MyModel, DESCRIPTOR(a, b)))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5],
e=[$6], f=[$7])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a',
_UTF-16LE'b'))], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647)
c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3)
*PROCTIME* proctime, VARCHAR(2147483647) e, INTEGER ARRAY f)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[$5])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4],
proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime,
e, f])
++- MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a',
_UTF-16LE'b'))], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647)
c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3)
*PROCTIME* proctime, VARCHAR(2147483647) e, INTEGER ARRAY f)])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL
SECOND)])
+ +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+</Root>