This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2df2286ccd2 [FLINK-37791][table] Support to convert ML_PREDICT to 
physical node(#26603)
2df2286ccd2 is described below

commit 2df2286ccd275ba351f2aa8158ac8f756206daeb
Author: Hao Li <[email protected]>
AuthorDate: Tue Jun 3 23:15:30 2025 -0700

    [FLINK-37791][table] Support to convert ML_PREDICT to physical node(#26603)
---
 .../flink/table/factories/FactoryUtilTest.java     |   1 +
 .../table/factories/TestModelProviderFactory.java  |  11 +-
 ...ava => TestNonPredictModelProviderFactory.java} |   4 +-
 .../org.apache.flink.table.factories.Factory       |   1 +
 .../java/org/apache/calcite/sql/SqlModelCall.java  |   4 +
 .../planner/calcite/FlinkCalciteSqlValidator.java  |   4 +
 .../planner/calcite/FlinkConvertletTable.java      |   8 +
 .../flink/table/planner/calcite/RexModelCall.java  |  67 ++
 .../table/planner/catalog/CatalogSchemaModel.java  |  54 ++
 .../StreamPhysicalMLPredictTableFunction.java      |  77 ++
 .../StreamPhysicalMLPredictTableFunctionRule.java  |  84 ++
 .../FlinkChangelogModeInferenceProgram.scala       |   7 +-
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |   4 +-
 .../stream/sql/MLPredictTableFunctionTest.java     |  92 +-
 .../plan/stream/sql/MLPredictTableFunctionTest.xml | 944 +++++++++++++++++++++
 15 files changed, 1343 insertions(+), 19 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
index 5f7bfe61797..7b8237f6f84 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -760,6 +760,7 @@ class FactoryUtilTest {
                         + "' in the classpath.\n\n"
                         + "Available factory identifiers are:\n\n"
                         + "conflicting-model\n"
+                        + "non-predict-model\n"
                         + "test-model");
     }
 
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
index 725e25df24a..b4a2da061e8 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
@@ -22,12 +22,14 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.table.catalog.ResolvedCatalogModel;
 import org.apache.flink.table.factories.FactoryUtil.ModelProviderFactoryHelper;
+import org.apache.flink.table.functions.PredictFunction;
 import org.apache.flink.table.ml.ModelProvider;
+import org.apache.flink.table.ml.PredictRuntimeProvider;
 
 import java.util.HashSet;
 import java.util.Set;
 
-/** Test implementation for {@link ModelProviderFactory}. */
+/** Test implementation for {@link ModelProviderFactory} which creates 
PredictRuntimeProvider. */
 public final class TestModelProviderFactory implements ModelProviderFactory {
 
     public static final String IDENTIFIER = "test-model";
@@ -79,7 +81,7 @@ public final class TestModelProviderFactory implements 
ModelProviderFactory {
     }
 
     /** Test implementation of {@link ModelProvider} for testing purposes. */
-    public static class TestModelProviderMock implements ModelProvider {
+    public static class TestModelProviderMock implements 
PredictRuntimeProvider {
 
         private final ResolvedCatalogModel catalogModel;
 
@@ -102,5 +104,10 @@ public final class TestModelProviderFactory implements 
ModelProviderFactory {
             }
             return catalogModel.equals(((TestModelProviderMock) 
o).catalogModel);
         }
+
+        @Override
+        public PredictFunction createPredictFunction(Context context) {
+            throw new UnsupportedOperationException("To be implemented");
+        }
     }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestNonPredictModelProviderFactory.java
similarity index 95%
copy from 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
copy to 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestNonPredictModelProviderFactory.java
index 725e25df24a..d975f0a264e 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestModelProviderFactory.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestNonPredictModelProviderFactory.java
@@ -28,9 +28,9 @@ import java.util.HashSet;
 import java.util.Set;
 
 /** Test implementation for {@link ModelProviderFactory}. */
-public final class TestModelProviderFactory implements ModelProviderFactory {
+public final class TestNonPredictModelProviderFactory implements 
ModelProviderFactory {
 
-    public static final String IDENTIFIER = "test-model";
+    public static final String IDENTIFIER = "non-predict-model";
 
     public static final ConfigOption<String> TASK =
             ConfigOptions.key("task")
diff --git 
a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index bf2382c7310..3053d0f7814 100644
--- 
a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -24,6 +24,7 @@ 
org.apache.flink.table.factories.TestConflictingDynamicTableFactory2
 org.apache.flink.table.factories.TestCatalogFactory
 org.apache.flink.table.factories.TestCatalogStoreFactory
 org.apache.flink.table.factories.TestModelProviderFactory
+org.apache.flink.table.factories.TestNonPredictModelProviderFactory
 org.apache.flink.table.factories.TestConflictingModelProviderFactory1
 org.apache.flink.table.factories.TestConflictingModelProviderFactory2
 org.apache.flink.table.factories.module.DummyModuleFactory
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/SqlModelCall.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/SqlModelCall.java
index dc151792535..d0f160804c5 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/SqlModelCall.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/SqlModelCall.java
@@ -53,6 +53,10 @@ public class SqlModelCall extends SqlBasicCall {
         return model.getOutputRowType(validator.getTypeFactory());
     }
 
+    public CatalogSchemaModel getModel() {
+        return model;
+    }
+
     /**
      * A custom SqlOperator to handle model identifier.
      *
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index 799cc9277e6..f63b368cbad 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -131,6 +131,10 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
                         
.get(TableConfigOptions.TABLE_COLUMN_EXPANSION_STRATEGY);
     }
 
+    public RelOptCluster getRelOptCluster() {
+        return relOptCluster;
+    }
+
     public void setExpectedOutputType(SqlNode sqlNode, RelDataType 
expectedOutputType) {
         this.sqlNodeForExpectedOutputType = sqlNode;
         this.expectedOutputType = expectedOutputType;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
index c03c858efd6..569704f1d87 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlIntervalQualifier;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlModelCall;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOperator;
@@ -65,11 +66,18 @@ public class FlinkConvertletTable implements 
SqlRexConvertletTable {
             return this::convertTryCast;
         } else if (operator instanceof SqlTableFunction) {
             return this::convertTableArgs;
+        } else if (call instanceof SqlModelCall) {
+            return this::convertModelCall;
         }
 
         return StandardConvertletTable.INSTANCE.get(call);
     }
 
+    private RexNode convertModelCall(SqlRexContext cx, final SqlCall call) {
+        SqlModelCall modelCall = (SqlModelCall) call;
+        return modelCall.getModel().toRex(cx);
+    }
+
     // Slightly modified version of StandardConvertletTable::convertCast
     private RexNode convertTryCast(SqlRexContext cx, final SqlCall call) {
         RelDataTypeFactory typeFactory = cx.getTypeFactory();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexModelCall.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexModelCall.java
new file mode 100644
index 00000000000..2e71d217f70
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexModelCall.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.flink.table.catalog.ContextResolvedModel;
+import org.apache.flink.table.ml.ModelProvider;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlSpecialOperator;
+
+import java.util.List;
+
+/**
+ * A {@link RexCall} that represents a call to a model provider model.
+ *
+ * <p>This is used to represent calls to models in the Flink SQL planner.
+ */
+public class RexModelCall extends RexCall {
+
+    private final ModelProvider modelProvider;
+    private final ContextResolvedModel contextResolvedModel;
+
+    public RexModelCall(
+            RelDataType outputType,
+            ContextResolvedModel contextResolvedModel,
+            ModelProvider modelProvider) {
+        super(outputType, new SqlSpecialOperator("Model", SqlKind.OTHER), 
List.of());
+        this.contextResolvedModel = contextResolvedModel;
+        this.modelProvider = modelProvider;
+    }
+
+    public ModelProvider getModelProvider() {
+        return modelProvider;
+    }
+
+    @Override
+    protected String computeDigest(boolean withType) {
+        final StringBuilder sb = new StringBuilder(op.getName());
+        sb.append("(");
+        sb.append("MODEL ")
+                .append(contextResolvedModel.getIdentifier().asSummaryString())
+                .append(")");
+        if (withType) {
+            sb.append(":");
+            sb.append(type.getFullTypeString());
+        }
+        return sb.toString();
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaModel.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaModel.java
index c26f3a16224..945e674ed6f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaModel.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaModel.java
@@ -18,19 +18,34 @@
 
 package org.apache.flink.table.planner.catalog;
 
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.ContextResolvedModel;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.ModelProviderFactory;
+import org.apache.flink.table.ml.ModelProvider;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
+import org.apache.flink.table.planner.calcite.FlinkContext;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.RexModelCall;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
 import org.apache.flink.table.runtime.types.PlannerTypeUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 
+import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql2rel.SqlRexContext;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.OptionalUtils.firstPresent;
+
 /**
  * Represents a wrapper for {@link ContextResolvedModel} in {@link
  * org.apache.calcite.schema.Schema}.
@@ -75,6 +90,15 @@ public class CatalogSchemaModel {
         return schemaToRelDataType(flinkTypeFactory, schema);
     }
 
+    public RexNode toRex(SqlRexContext rexContext) {
+        FlinkCalciteSqlValidator validator = (FlinkCalciteSqlValidator) 
rexContext.getValidator();
+        RelOptCluster cluster = validator.getRelOptCluster();
+        FlinkContext context = ShortcutUtils.unwrapContext(cluster);
+        ModelProvider modelProvider = createModelProvider(context, 
contextResolvedModel);
+        return new RexModelCall(
+                getInputRowType(validator.getTypeFactory()), 
contextResolvedModel, modelProvider);
+    }
+
     private static RelDataType schemaToRelDataType(
             FlinkTypeFactory typeFactory, ResolvedSchema schema) {
         final List<String> fieldNames = schema.getColumnNames();
@@ -85,4 +109,34 @@ public class CatalogSchemaModel {
                         .collect(Collectors.toList());
         return typeFactory.buildRelNodeRowType(fieldNames, fieldTypes);
     }
+
+    private ModelProvider createModelProvider(
+            FlinkContext context, ContextResolvedModel catalogModel) {
+
+        final Optional<ModelProviderFactory> factoryFromCatalog =
+                catalogModel
+                        .getCatalog()
+                        .flatMap(Catalog::getFactory)
+                        .map(
+                                f ->
+                                        f instanceof ModelProviderFactory
+                                                ? (ModelProviderFactory) f
+                                                : null);
+
+        final Optional<ModelProviderFactory> factoryFromModule =
+                
context.getModuleManager().getFactory(Module::getModelProviderFactory);
+
+        // Since the catalog is more specific, we give it precedence over a 
factory provided by any
+        // modules.
+        final ModelProviderFactory factory =
+                firstPresent(factoryFromCatalog, 
factoryFromModule).orElse(null);
+
+        return FactoryUtil.createModelProvider(
+                factory,
+                contextResolvedModel.getIdentifier(),
+                contextResolvedModel.getResolvedModel(),
+                context.getTableConfig(),
+                context.getClassLoader(),
+                contextResolvedModel.isTemporary());
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunction.java
new file mode 100644
index 00000000000..faaf96f5dfe
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream;
+
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.type.RelDataType;
+
+import java.util.List;
+
+/** Stream physical RelNode for ml predict table function. */
+public class StreamPhysicalMLPredictTableFunction extends SingleRel implements 
StreamPhysicalRel {
+
+    private final RelDataType outputRowType;
+    private final FlinkLogicalTableFunctionScan scan;
+
+    public StreamPhysicalMLPredictTableFunction(
+            RelOptCluster cluster,
+            RelTraitSet traits,
+            RelNode inputRel,
+            FlinkLogicalTableFunctionScan scan,
+            RelDataType outputRowType) {
+        super(cluster, traits, inputRel);
+        this.scan = scan;
+        this.outputRowType = outputRowType;
+    }
+
+    @Override
+    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new StreamPhysicalMLPredictTableFunction(
+                getCluster(), traitSet, inputs.get(0), scan, getRowType());
+    }
+
+    @Override
+    public boolean requireWatermark() {
+        return false;
+    }
+
+    @Override
+    public ExecNode<?> translateToExecNode() {
+        return null;
+    }
+
+    @Override
+    protected RelDataType deriveRowType() {
+        return outputRowType;
+    }
+
+    @Override
+    public RelWriter explainTerms(RelWriter pw) {
+        return super.explainTerms(pw)
+                .item("invocation", scan.getCall())
+                .item("rowType", getRowType());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunctionRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunctionRule.java
new file mode 100644
index 00000000000..640c01e495d
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunctionRule.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream;
+
+import org.apache.flink.table.ml.AsyncPredictRuntimeProvider;
+import org.apache.flink.table.ml.PredictRuntimeProvider;
+import org.apache.flink.table.planner.calcite.RexModelCall;
+import 
org.apache.flink.table.planner.functions.sql.ml.SqlMLPredictTableFunction;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexCall;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Rule to convert a {@link FlinkLogicalTableFunctionScan} with ml_predict 
call into a {@link
+ * StreamPhysicalMLPredictTableFunction}.
+ */
+public class StreamPhysicalMLPredictTableFunctionRule extends ConverterRule {
+
+    public static final StreamPhysicalMLPredictTableFunctionRule INSTANCE =
+            new StreamPhysicalMLPredictTableFunctionRule(
+                    Config.INSTANCE.withConversion(
+                            FlinkLogicalTableFunctionScan.class,
+                            FlinkConventions.LOGICAL(),
+                            FlinkConventions.STREAM_PHYSICAL(),
+                            "StreamPhysicalModelTableFunctionRule"));
+
+    private StreamPhysicalMLPredictTableFunctionRule(Config config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        final FlinkLogicalTableFunctionScan scan = call.rel(0);
+        final RexCall rexCall = (RexCall) scan.getCall();
+        if (!(rexCall.getOperator() instanceof SqlMLPredictTableFunction)) {
+            return false;
+        }
+
+        final RexModelCall modelCall = (RexModelCall) 
rexCall.getOperands().get(1);
+        return modelCall.getModelProvider() instanceof PredictRuntimeProvider
+                || modelCall.getModelProvider() instanceof 
AsyncPredictRuntimeProvider;
+    }
+
+    @Override
+    public @Nullable RelNode convert(RelNode rel) {
+        final FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) rel;
+        final RelNode newInput =
+                RelOptRule.convert(scan.getInput(0), 
FlinkConventions.STREAM_PHYSICAL());
+
+        final RelTraitSet providedTraitSet =
+                rel.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
+
+        // TODO:
+        // Get model provider and context resolved model from RexModelCall
+        // Get table input from descriptor
+        // Get config from map
+        // Create ModelProviderSpec similar to DynamicTableSourceSpec and 
TemporalTableSourceSpec
+        return new StreamPhysicalMLPredictTableFunction(
+                scan.getCluster(), providedTraitSet, newInput, scan, 
scan.getRowType());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index e84edf8ad2e..9e6bcd119ee 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -376,7 +376,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
       case _: StreamPhysicalCalcBase | _: StreamPhysicalCorrelateBase |
           _: StreamPhysicalLookupJoin | _: StreamPhysicalExchange | _: 
StreamPhysicalExpand |
           _: StreamPhysicalMiniBatchAssigner | _: 
StreamPhysicalWatermarkAssigner |
-          _: StreamPhysicalWindowTableFunction =>
+          _: StreamPhysicalWindowTableFunction | _: 
StreamPhysicalMLPredictTableFunction =>
         // transparent forward requiredTrait to children
         val children = visitChildren(rel, requiredTrait, requester)
         val childrenTrait = 
children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
@@ -716,7 +716,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         case _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin |
             _: StreamPhysicalExchange | _: StreamPhysicalExpand |
             _: StreamPhysicalMiniBatchAssigner | _: 
StreamPhysicalWatermarkAssigner |
-            _: StreamPhysicalWindowTableFunction =>
+            _: StreamPhysicalWindowTableFunction | _: 
StreamPhysicalMLPredictTableFunction =>
           // transparent forward requiredTrait to children
           visitChildren(rel, requiredUpdateTrait) match {
             case None => None
@@ -1083,7 +1083,8 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
             _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
             _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
             _: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
-            _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin 
=>
+            _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin 
|
+            _: StreamPhysicalMLPredictTableFunction =>
           // if not explicitly supported, all operators require full deletes 
if there are updates
           val children = rel.getInputs.map {
             case child: StreamPhysicalRel =>
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 74219150ed0..f24433deb4b 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.planner.plan.rules
 
 import org.apache.flink.table.planner.plan.nodes.logical._
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalProcessTableFunctionRule
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalMLPredictTableFunctionRule,
 StreamPhysicalProcessTableFunctionRule}
 import org.apache.flink.table.planner.plan.rules.logical._
 import 
org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule
 import org.apache.flink.table.planner.plan.rules.physical.stream._
@@ -470,6 +470,8 @@ object FlinkStreamRuleSets {
     StreamPhysicalWindowDeduplicateRule.INSTANCE,
     // process table function
     StreamPhysicalProcessTableFunctionRule.INSTANCE,
+    // model TVFs
+    StreamPhysicalMLPredictTableFunctionRule.INSTANCE,
     // join
     StreamPhysicalJoinRule.INSTANCE,
     StreamPhysicalIntervalJoinRule.INSTANCE,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
index 20bf939a807..9a786d9d009 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.stream.sql;
 
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.planner.utils.TableTestBase;
 import org.apache.flink.table.planner.utils.TableTestUtil;
@@ -64,7 +65,11 @@ public class MLPredictTableFunctionTest extends 
TableTestBase {
                                 + "INPUT (a INT, b BIGINT)\n"
                                 + "OUTPUT(e STRING, f ARRAY<INT>)\n"
                                 + "with (\n"
-                                + "  'provider' = 'openai'\n"
+                                + "  'provider' = 'test-model',\n" // test 
model provider defined in
+                                // TestModelProviderFactory in
+                                // flink-table-common
+                                + "  'endpoint' = 'someendpoint',\n"
+                                + "  'task' = 'text_generation'\n"
                                 + ")");
     }
 
@@ -75,7 +80,7 @@ public class MLPredictTableFunctionTest extends TableTestBase 
{
                         + "FROM TABLE(ML_PREDICT(INPUT => TABLE MyTable, "
                         + "MODEL => MODEL MyModel, "
                         + "ARGS  => DESCRIPTOR(a, b)))";
-        assertReachesRelConverter(sql);
+        util.verifyRelPlan(sql);
     }
 
     @Test
@@ -86,7 +91,7 @@ public class MLPredictTableFunctionTest extends TableTestBase 
{
                         + "MODEL  => MODEL MyModel, "
                         + "ARGS   => DESCRIPTOR(a, b),"
                         + "CONFIG => MAP['key', 'value']))";
-        assertReachesRelConverter(sql);
+        util.verifyRelPlan(sql);
     }
 
     @Test
@@ -94,7 +99,7 @@ public class MLPredictTableFunctionTest extends TableTestBase 
{
         String sql =
                 "SELECT *\n"
                         + "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL MyModel, 
DESCRIPTOR(a, b)))";
-        assertReachesRelConverter(sql);
+        util.verifyRelPlan(sql);
     }
 
     @Test
@@ -103,7 +108,7 @@ public class MLPredictTableFunctionTest extends 
TableTestBase {
         String sql =
                 "SELECT *\n"
                         + "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL MyModel, 
DESCRIPTOR(a, b), MAP['async', 'true', 'timeout', '100s']))";
-        assertReachesRelConverter(sql);
+        util.verifyRelPlan(sql);
     }
 
     @Test
@@ -142,13 +147,15 @@ public class MLPredictTableFunctionTest extends 
TableTestBase {
                                 + "INPUT (a INT, b BIGINT)\n"
                                 + "OUTPUT(c STRING, d ARRAY<INT>)\n"
                                 + "with (\n"
-                                + "  'provider' = 'openai'\n"
+                                + "  'task' = 'text_generation',\n"
+                                + "  'endpoint' = 'someendpoint',\n"
+                                + "  'provider' = 'test-model'"
                                 + ")");
 
         String sql =
                 "SELECT *\n"
                         + "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL 
ConflictModel, DESCRIPTOR(a, b)))";
-        assertReachesRelConverter(sql);
+        util.verifyRelPlan(sql);
     }
 
     @Test
@@ -215,14 +222,16 @@ public class MLPredictTableFunctionTest extends 
TableTestBase {
                                         + "INPUT (x %s)\n"
                                         + "OUTPUT (res STRING)\n"
                                         + "with (\n"
-                                        + "  'provider' = 'openai'\n"
+                                        + "  'task' = 'text_generation',\n"
+                                        + "  'endpoint' = 'someendpoint',\n"
+                                        + "  'provider' = 'test-model'"
                                         + ")",
                                 modelType));
 
         String sql =
                 "SELECT *\n"
                         + "FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL 
TypeModel, DESCRIPTOR(col)))";
-        assertReachesRelConverter(sql);
+        util.verifyRelPlan(sql);
     }
 
     @ParameterizedTest
@@ -271,9 +280,70 @@ public class MLPredictTableFunctionTest extends 
TableTestBase {
                         "ML_PREDICT config param can only be a MAP of string 
literals. The item at position 1 is TRUE.");
     }
 
-    private void assertReachesRelConverter(String sql) {
+    @Test
+    public void testNonExistProvider() {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE MODEL ConflictModel\n"
+                                + "INPUT (a INT, b BIGINT)\n"
+                                + "OUTPUT(c STRING, d ARRAY<INT>)\n"
+                                + "with (\n"
+                                + "  'task' = 'text_generation',\n"
+                                + "  'endpoint' = 'someendpoint',\n"
+                                + "  'provider' = 'non-exist-model'"
+                                + ")");
+
+        String sql =
+                "SELECT *\n"
+                        + "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL 
ConflictModel, DESCRIPTOR(a, b)))";
+        assertThatThrownBy(() -> util.verifyRelPlan(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Unable to create a model provider for model 
'default_catalog.default_database.ConflictModel'.");
+    }
+
+    @Test
+    public void testNonPredictProvider() {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE MODEL ConflictModel\n"
+                                + "INPUT (a INT, b BIGINT)\n"
+                                + "OUTPUT(c STRING, d ARRAY<INT>)\n"
+                                + "with (\n"
+                                + "  'task' = 'text_generation',\n"
+                                + "  'endpoint' = 'someendpoint',\n"
+                                + "  'provider' = 'non-exist-model'"
+                                + ")");
+
+        String sql =
+                "SELECT *\n"
+                        + "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL 
ConflictModel, DESCRIPTOR(a, b)))";
+        assertThatThrownBy(() -> util.verifyRelPlan(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Unable to create a model provider for model 
'default_catalog.default_database.ConflictModel'.");
+    }
+
+    @Test
+    public void testNotMLPredictRuntimeProvider() {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE MODEL ConflictModel\n"
+                                + "INPUT (a INT, b BIGINT)\n"
+                                + "OUTPUT(c STRING, d ARRAY<INT>)\n"
+                                + "with (\n"
+                                + "  'task' = 'text_generation',\n"
+                                + "  'endpoint' = 'someendpoint',\n"
+                                + "  'provider' = 'non-predict-model'"
+                                + ")");
+
+        String sql =
+                "SELECT *\n"
+                        + "FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL 
ConflictModel, DESCRIPTOR(a, b)))";
         assertThatThrownBy(() -> util.verifyRelPlan(sql))
-                .hasMessageContaining("while converting MODEL");
+                .isInstanceOf(TableException.class)
+                .hasMessageContaining(
+                        "This exception indicates that the query uses an 
unsupported SQL feature.");
     }
 
     private static Stream<Arguments> compatibleTypeProvider() {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
new file mode 100644
index 00000000000..9ee6c7e384d
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
@@ -0,0 +1,944 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testCompatibleInputTypes[[10] STRING, STRING]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(VARCHAR(2147483647) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(VARCHAR(2147483647) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[11] BINARY(10), BINARY(10)]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(BINARY(10) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(BINARY(10) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[12] VARBINARY(10), VARBINARY(10)]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(VARBINARY(10) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(VARBINARY(10) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[13] DATE, DATE]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DATE col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DATE col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[14] TIME(3), TIME(3)]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[15] TIMESTAMP(3), TIMESTAMP(3)]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TIMESTAMP(3) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TIMESTAMP(3) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[16] TIMESTAMP_LTZ(3), 
TIMESTAMP_LTZ(3)]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) col, VARCHAR(2147483647) 
res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) col, VARCHAR(2147483647) 
res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[17] TINYINT, SMALLINT]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TINYINT col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TINYINT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[18] SMALLINT, INT]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(SMALLINT col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(SMALLINT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[19] INT, BIGINT]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(INTEGER col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(INTEGER col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[1] STRING NOT NULL, STRING]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(VARCHAR(2147483647) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(VARCHAR(2147483647) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[20] FLOAT, DOUBLE]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(FLOAT col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(FLOAT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[21] DECIMAL(5,2), DECIMAL(10,2)]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DECIMAL(5, 2) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DECIMAL(5, 2) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[22] DECIMAL(10,2), DECIMAL(5,2)]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DECIMAL(10, 2) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DECIMAL(10, 2) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[23] CHAR(10), STRING]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(CHAR(10) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(CHAR(10) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[24] VARCHAR(20), STRING]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(VARCHAR(20) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(VARCHAR(20) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[25] TIMESTAMP(3), TIMESTAMP(3)]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TIMESTAMP(3) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TIMESTAMP(3) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[26] DATE, DATE]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DATE col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DATE col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[27] TIME(3), TIME(3)]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[28] ARRAY&#60;INT&#62;, 
ARRAY&#60;INT&#62;]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(INTEGER ARRAY col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(INTEGER ARRAY col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[29] ARRAY&#60;TINYINT&#62;, 
ARRAY&#60;SMALLINT&#62;]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TINYINT ARRAY col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TINYINT ARRAY col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[2] BOOLEAN, BOOLEAN]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(BOOLEAN col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(BOOLEAN col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[30] ARRAY&#60;DECIMAL(5,2)&#62;, 
ARRAY&#60;DECIMAL(10,2)&#62;]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DECIMAL(5, 2) ARRAY col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DECIMAL(5, 2) ARRAY col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[31] ARRAY&#60;VARCHAR(20)&#62;, 
ARRAY&#60;STRING&#62;]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(VARCHAR(20) ARRAY col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(VARCHAR(20) ARRAY col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[32] MAP&#60;STRING, INT&#62;, 
MAP&#60;STRING, INT&#62;]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType((VARCHAR(2147483647), INTEGER) MAP col, VARCHAR(2147483647) 
res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType((VARCHAR(2147483647), INTEGER) MAP col, VARCHAR(2147483647) 
res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[33] MAP&#60;STRING, 
DECIMAL(5,2)&#62;, MAP&#60;STRING, DECIMAL(10,2)&#62;]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType((VARCHAR(2147483647), DECIMAL(5, 2)) MAP col, 
VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType((VARCHAR(2147483647), DECIMAL(5, 2)) MAP col, 
VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[34] MAP&#60;VARCHAR(20), 
ARRAY&#60;INT&#62;&#62;, MAP&#60;STRING, ARRAY&#60;INT&#62;&#62;]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType((VARCHAR(20), INTEGER ARRAY) MAP col, VARCHAR(2147483647) 
res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType((VARCHAR(20), INTEGER ARRAY) MAP col, VARCHAR(2147483647) 
res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[35] ROW&#60;a INT, b STRING&#62;, 
ROW&#60;a INT, b STRING&#62;]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(RecordType:peek_no_expand(INTEGER a, VARCHAR(2147483647) b) 
col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(RecordType:peek_no_expand(INTEGER a, VARCHAR(2147483647) b) 
col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[36] ROW&#60;a INT, b STRING&#62;, 
ROW&#60;x INT, y STRING&#62;]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(RecordType:peek_no_expand(INTEGER a, VARCHAR(2147483647) b) 
col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(RecordType:peek_no_expand(INTEGER a, VARCHAR(2147483647) b) 
col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[37] ROW&#60;a DECIMAL(5,2), b 
ARRAY&#60;INT&#62;&#62;, ROW&#60;a DECIMAL(10,2), b ARRAY&#60;INT&#62;&#62;]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(RecordType:peek_no_expand(DECIMAL(5, 2) a, INTEGER ARRAY b) 
col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(RecordType:peek_no_expand(DECIMAL(5, 2) a, INTEGER ARRAY b) 
col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[38] ROW&#60;a VARCHAR(20), b 
MAP&#60;STRING, INT&#62;&#62;, ROW&#60;a STRING, b MAP&#60;STRING, 
INT&#62;&#62;]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(RecordType:peek_no_expand(VARCHAR(20) a, 
(VARCHAR(2147483647), INTEGER) MAP b) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(RecordType:peek_no_expand(VARCHAR(20) a, 
(VARCHAR(2147483647), INTEGER) MAP b) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[39] ROW&#60;a ARRAY&#60;INT&#62;, 
b MAP&#60;STRING, ARRAY&#60;DECIMAL(5,2)&#62;&#62;&#62;, ROW&#60;a 
ARRAY&#60;INT&#62;, b MAP&#60;STRING, ARRAY&#60;DECIMAL(10,2)&#62;&#62;&#62;]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(RecordType:peek_no_expand(INTEGER ARRAY a, 
(VARCHAR(2147483647), DECIMAL(5, 2) ARRAY) MAP b) col, VARCHAR(2147483647) 
res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(RecordType:peek_no_expand(INTEGER ARRAY a, 
(VARCHAR(2147483647), DECIMAL(5, 2) ARRAY) MAP b) col, VARCHAR(2147483647) 
res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[3] TINYINT, TINYINT]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TINYINT col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(TINYINT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[40] MAP&#60;STRING, ROW&#60;a INT, 
b ARRAY&#60;VARCHAR(20)&#62;&#62;&#62;, MAP&#60;STRING, ROW&#60;a INT, b 
ARRAY&#60;STRING&#62;&#62;&#62;]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType((VARCHAR(2147483647), RecordType:peek_no_expand(INTEGER a, 
VARCHAR(20) ARRAY b)) MAP col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType((VARCHAR(2147483647), RecordType:peek_no_expand(INTEGER a, 
VARCHAR(20) ARRAY b)) MAP col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[4] SMALLINT, SMALLINT]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(SMALLINT col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(SMALLINT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[5] INT, INT]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(INTEGER col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(INTEGER col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[6] BIGINT, BIGINT]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(BIGINT col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(BIGINT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[7] FLOAT, FLOAT]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(FLOAT col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(FLOAT col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[8] DOUBLE, DOUBLE]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DOUBLE col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DOUBLE col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompatibleInputTypes[[9] DECIMAL(10,2), DECIMAL(10,2)]">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, DESCRIPTOR(col)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(col=[$0], res=[$1])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DECIMAL(10, 2) col, VARCHAR(2147483647) res)])
+   +- LogicalProject(col=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'))], 
rowType=[RecordType(DECIMAL(10, 2) col, VARCHAR(2147483647) res)])
++- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNamedArguments">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(INPUT => TABLE MyTable, MODEL => MODEL MyModel, ARGS  => 
DESCRIPTOR(a, b)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
e=[$6], f=[$7])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a', 
_UTF-16LE'b'))], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) 
c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) 
*PROCTIME* proctime, VARCHAR(2147483647) e, INTEGER ARRAY f)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e, f])
++- MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a', 
_UTF-16LE'b'))], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) 
c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) 
*PROCTIME* proctime, VARCHAR(2147483647) e, INTEGER ARRAY f)])
+   +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
+      +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConfigWithCast">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL MyModel, DESCRIPTOR(a, b), 
MAP['async', 'true', 'timeout', '100s']))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
e=[$6], f=[$7])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a', 
_UTF-16LE'b'), MAP(_UTF-16LE'async':VARCHAR(7) CHARACTER SET "UTF-16LE", 
_UTF-16LE'true', _UTF-16LE'timeout':VARCHAR(7) CHARACTER SET "UTF-16LE", 
_UTF-16LE'100s'))], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, 
TIMESTAMP_LTZ(3) *PROCTIME* proctime, VARCHAR(2147483647) e, INTEGER A [...]
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e, f])
++- MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a', 
_UTF-16LE'b'), MAP(_UTF-16LE'async':VARCHAR(7) CHARACTER SET "UTF-16LE", 
_UTF-16LE'true', _UTF-16LE'timeout':VARCHAR(7) CHARACTER SET "UTF-16LE", 
_UTF-16LE'100s'))], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, 
TIMESTAMP_LTZ(3) *PROCTIME* proctime, VARCHAR(2147483647) e, INTEGER ARRAY f)])
+   +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
+      +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOptionalNamedArguments">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(INPUT => TABLE MyTable, MODEL  => MODEL MyModel, ARGS   
=> DESCRIPTOR(a, b),CONFIG => MAP['key', 'value']))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
e=[$6], f=[$7])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a', 
_UTF-16LE'b'), MAP(_UTF-16LE'key', _UTF-16LE'value'))], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) 
d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, 
VARCHAR(2147483647) e, INTEGER ARRAY f)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e, f])
++- MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a', 
_UTF-16LE'b'), MAP(_UTF-16LE'key', _UTF-16LE'value'))], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) 
d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, 
VARCHAR(2147483647) e, INTEGER ARRAY f)])
+   +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
+      +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConflictOutputColumnName">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL ConflictModel, DESCRIPTOR(a, b)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
c0=[$6], d0=[$7])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.ConflictModel), DESCRIPTOR(_UTF-16LE'a', 
_UTF-16LE'b'))], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) 
c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) 
*PROCTIME* proctime, VARCHAR(2147483647) c0, INTEGER ARRAY d0)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
c0, d0])
++- MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.ConflictModel), DESCRIPTOR(_UTF-16LE'a', 
_UTF-16LE'b'))], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) 
c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) 
*PROCTIME* proctime, VARCHAR(2147483647) c0, INTEGER ARRAY d0)])
+   +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
+      +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSimple">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM TABLE(ML_PREDICT(TABLE MyTable, MODEL MyModel, DESCRIPTOR(a, b)))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], 
e=[$6], f=[$7])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a', 
_UTF-16LE'b'))], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) 
c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) 
*PROCTIME* proctime, VARCHAR(2147483647) e, INTEGER ARRAY f)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[$5])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
1000:INTERVAL SECOND)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], 
proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
e, f])
++- MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.MyModel), DESCRIPTOR(_UTF-16LE'a', 
_UTF-16LE'b'))], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) 
c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) 
*PROCTIME* proctime, VARCHAR(2147483647) e, INTEGER ARRAY f)])
+   +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
+      +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+</Root>


Reply via email to