fsk119 commented on code in PR #27108:
URL: https://github.com/apache/flink/pull/27108#discussion_r2479936108


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ModelImpl.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Model;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.ContextResolvedModel;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.types.ColumnList;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import static org.apache.flink.table.api.Expressions.lit;
+
+/** Implementation of {@link Model} that works with the Table API. */
+@Internal
+public class ModelImpl implements Model {
+
+    private final TableEnvironmentInternal tableEnvironment;
+    private final ContextResolvedModel model;
+
+    private ModelImpl(TableEnvironmentInternal tableEnvironment, 
ContextResolvedModel model) {
+        this.tableEnvironment = tableEnvironment;
+        this.model = model;
+    }
+
+    public static ModelImpl createModel(
+            TableEnvironmentInternal tableEnvironment, ContextResolvedModel 
model) {
+        return new ModelImpl(tableEnvironment, model);
+    }
+
+    public ContextResolvedModel getModel() {
+        return model;
+    }
+
+    @Override
+    public ResolvedSchema getResolvedInputSchema() {
+        return model.getResolvedModel().getResolvedInputSchema();
+    }
+
+    @Override
+    public ResolvedSchema getResolvedOutputSchema() {
+        return model.getResolvedModel().getResolvedOutputSchema();
+    }
+
+    public TableEnvironment getTableEnv() {
+        return tableEnvironment;
+    }
+
+    @Override
+    public Table predict(Table table, ColumnList inputColumns) {
+        return predict(table, inputColumns, Map.of());
+    }
+
+    @Override
+    public Table predict(Table table, ColumnList inputColumns, Map<String, 
String> options) {
+        // Use Expressions.map() instead of Expression.lit() to create a MAP 
literal since
+        // lit() is not serializable to sql.
+        if (options.isEmpty()) {
+            return tableEnvironment.fromCall(
+                    "ML_PREDICT",

Review Comment:
   use 
`org.apache.flink.table.functions.BuiltInFunctionDefinitions#ML_PREDICT.getName()`
 instead?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ModelImpl.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Model;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.ContextResolvedModel;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.types.ColumnList;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import static org.apache.flink.table.api.Expressions.lit;
+
+/** Implementation of {@link Model} that works with the Table API. */
+@Internal
+public class ModelImpl implements Model {
+
+    private final TableEnvironmentInternal tableEnvironment;
+    private final ContextResolvedModel model;
+
+    private ModelImpl(TableEnvironmentInternal tableEnvironment, 
ContextResolvedModel model) {
+        this.tableEnvironment = tableEnvironment;
+        this.model = model;
+    }
+
+    public static ModelImpl createModel(
+            TableEnvironmentInternal tableEnvironment, ContextResolvedModel 
model) {
+        return new ModelImpl(tableEnvironment, model);
+    }
+
+    public ContextResolvedModel getModel() {
+        return model;
+    }
+
+    @Override
+    public ResolvedSchema getResolvedInputSchema() {
+        return model.getResolvedModel().getResolvedInputSchema();
+    }
+
+    @Override
+    public ResolvedSchema getResolvedOutputSchema() {
+        return model.getResolvedModel().getResolvedOutputSchema();
+    }
+
+    public TableEnvironment getTableEnv() {
+        return tableEnvironment;
+    }
+
+    @Override
+    public Table predict(Table table, ColumnList inputColumns) {
+        return predict(table, inputColumns, Map.of());
+    }
+
+    @Override
+    public Table predict(Table table, ColumnList inputColumns, Map<String, 
String> options) {
+        // Use Expressions.map() instead of Expression.lit() to create a MAP 
literal since
+        // lit() is not serializable to sql.
+        if (options.isEmpty()) {
+            return tableEnvironment.fromCall(
+                    "ML_PREDICT",
+                    table.asArgument("INPUT"),
+                    this.asArgument("MODEL"),
+                    Expressions.descriptor(inputColumns).asArgument("ARGS"));
+        }
+        ArrayList<String> configKVs = new ArrayList<>();
+        options.forEach(
+                (k, v) -> {
+                    configKVs.add(k);
+                    configKVs.add(v);
+                });
+        return tableEnvironment.fromCall(
+                "ML_PREDICT",
+                table.asArgument("INPUT"),
+                this.asArgument("MODEL"),
+                Expressions.descriptor(inputColumns).asArgument("ARGS"),
+                Expressions.map(

Review Comment:
   use Expressions.mapFromArrays instead.
   
   



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Model.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.types.ColumnList;
+
+import java.util.Map;
+
+/**
+ * The {@link Model} object is the core abstraction for ML model resources in 
the Table API.
+ *
+ * <p>A {@link Model} object describes a machine learning model resource that 
can be used for
+ * inference operations. It provides methods to perform prediction on data 
tables.
+ *
+ * <p>The {@link Model} interface offers main operations:
+ *
+ * <ul>
+ *   <li>{@link #predict(Table, ColumnList)} - Applies the model to make 
predictions on input data
+ * </ul>
+ *
+ * <p>{@code ml_predict} operation supports runtime options for configuring 
execution parameters
+ * such as asynchronous execution mode.
+ *
+ * <p>Every {@link Model} object has input and output schemas that describe 
the expected data
+ * structure for model operations, available through {@link 
#getResolvedInputSchema()} and {@link
+ * #getResolvedOutputSchema()}.
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * Model model = tableEnv.fromModelPath("my_model");
+ *
+ * // Simple prediction
+ * Table predictions = model.predict(inputTable, ColumnList.of("feature1", 
"feature2"));
+ *
+ * // Prediction with options
+ * Map<String, String> options = Map.of("max-concurrent-operations", "100", 
"timeout", "30s", "async", "true");
+ * Table predictions = model.predict(inputTable, ColumnList.of("feature1", 
"feature2"), options);
+ * }</pre>
+ */
+@PublicEvolving
+public interface Model {
+
+    /**
+     * Returns the resolved input schema of this model.
+     *
+     * <p>The input schema describes the structure and data types of the input 
columns that the
+     * model expects for inference operations.
+     *
+     * @return the resolved input schema.
+     */
+    ResolvedSchema getResolvedInputSchema();
+
+    /**
+     * Returns the resolved output schema of this model.
+     *
+     * <p>The output schema describes the structure and data types of the 
output columns that the
+     * model produces during inference operations.
+     *
+     * @return the resolved output schema.
+     */
+    ResolvedSchema getResolvedOutputSchema();
+
+    /**
+     * Performs prediction on the given table using specified input columns.
+     *
+     * <p>This method applies the model to the input data to generate 
predictions. The input columns
+     * must match the model's expected input schema.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * Table predictions = model.predict(inputTable, ColumnList.of("feature1", 
"feature2"));
+     * }</pre>
+     *
+     * @param table the input table containing data for prediction
+     * @param inputColumns the columns from the input table to use as model 
input
+     * @return a table containing the input data along with prediction results
+     */
+    Table predict(Table table, ColumnList inputColumns);
+
+    /**
+     * Performs prediction on the given table using specified input columns 
with runtime options.
+     *
+     * <p>This method applies the model to the input data to generate 
predictions with additional
+     * runtime configuration options such as max-concurrent-operations, 
timeout, and execution mode
+     * settings.
+     *
+     * <p>Common runtime options include:

Review Comment:
   It's not easy for the community to maintain the javadoc about these options. 
How about we just link the MLPredictRuntimeConfigOptions.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ModelImpl.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Model;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.ContextResolvedModel;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.types.ColumnList;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import static org.apache.flink.table.api.Expressions.lit;
+
+/** Implementation of {@link Model} that works with the Table API. */
+@Internal
+public class ModelImpl implements Model {
+
+    private final TableEnvironmentInternal tableEnvironment;
+    private final ContextResolvedModel model;
+
+    private ModelImpl(TableEnvironmentInternal tableEnvironment, 
ContextResolvedModel model) {
+        this.tableEnvironment = tableEnvironment;
+        this.model = model;
+    }
+
+    public static ModelImpl createModel(
+            TableEnvironmentInternal tableEnvironment, ContextResolvedModel 
model) {
+        return new ModelImpl(tableEnvironment, model);
+    }
+
+    public ContextResolvedModel getModel() {
+        return model;
+    }
+
+    @Override
+    public ResolvedSchema getResolvedInputSchema() {
+        return model.getResolvedModel().getResolvedInputSchema();
+    }
+
+    @Override
+    public ResolvedSchema getResolvedOutputSchema() {
+        return model.getResolvedModel().getResolvedOutputSchema();
+    }
+
+    public TableEnvironment getTableEnv() {
+        return tableEnvironment;
+    }
+
+    @Override
+    public Table predict(Table table, ColumnList inputColumns) {
+        return predict(table, inputColumns, Map.of());
+    }
+
+    @Override
+    public Table predict(Table table, ColumnList inputColumns, Map<String, 
String> options) {
+        // Use Expressions.map() instead of Expression.lit() to create a MAP 
literal since

Review Comment:
   typo: Expressions.lit() ?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -682,6 +686,29 @@ public Table fromCall(Class<? extends UserDefinedFunction> 
function, Object... a
                 operationTreeBuilder.tableFunction(Expressions.call(function, 
arguments)));
     }
 
+    @Override
+    public Model fromModelPath(String modelPath) {
+        UnresolvedIdentifier unresolvedIdentifier = 
getParser().parseIdentifier(modelPath);
+        ObjectIdentifier modelIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        return catalogManager
+                .getModel(modelIdentifier)
+                .map(this::createModel)
+                .orElseThrow(
+                        () ->
+                                new ValidationException(

Review Comment:
   throw ModelNotExistException?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ModelReferenceExpression.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Model;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.ContextResolvedModel;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A reference to a {@link Model} in an expression context.
+ *
+ * <p>This expression is used when a model needs to be passed as an argument 
to functions or
+ * operations that accept model references. It wraps a model object and 
provides the necessary
+ * expression interface for use in the Table API expression system.
+ *
+ * <p>The expression carries a string representation of the model and uses a 
special data type to
+ * indicate that this is a model reference rather than a regular data value.
+ */
+@Internal
+public final class ModelReferenceExpression implements ResolvedExpression {
+
+    private final String name;
+    private final ContextResolvedModel model;
+    private final TableEnvironment env;
+
+    public ModelReferenceExpression(String name, ContextResolvedModel model, 
TableEnvironment env) {
+        this.name = Preconditions.checkNotNull(name);
+        this.model = Preconditions.checkNotNull(model);
+        this.env = Preconditions.checkNotNull(env);
+    }
+
+    /**
+     * Returns the name of this model reference.
+     *
+     * @return the model reference name
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Returns the ContextResolvedModel associated with this model reference.
+     *
+     * @return the query context resolved model
+     */
+    public ContextResolvedModel getModel() {
+        return model;
+    }
+
+    public @Nullable TableEnvironment getTableEnvironment() {
+        return env;
+    }
+
+    /**
+     * Returns the input data type expected by this model reference.
+     *
+     * <p>This method extracts the input data type from the model's input 
schema, which describes
+     * the structure and data types that the model expects for inference 
operations.
+     *
+     * @return the input data type expected by the model
+     */
+    public DataType getInputDataType() {
+        return DataTypeUtils.fromResolvedSchemaPreservingTimeAttributes(
+                model.getResolvedModel().getResolvedInputSchema());
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypeUtils.fromResolvedSchemaPreservingTimeAttributes(
+                model.getResolvedModel().getResolvedOutputSchema());
+    }
+
+    @Override
+    public List<ResolvedExpression> getResolvedChildren() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public String asSerializableString(SqlFactory sqlFactory) {
+        return "MODEL " + model.getIdentifier().asSerializableString();
+    }
+
+    @Override
+    public String asSummaryString() {
+        return name;
+    }
+
+    @Override
+    public List<Expression> getChildren() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public <R> R accept(ExpressionVisitor<R> visitor) {
+        return visitor.visit(this);

Review Comment:
   Do we need to add a default method for ResolvedExpressionDefaultVisitor?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java:
##########
@@ -49,6 +49,8 @@ public final R visit(Expression other) {
 
     public abstract R visit(TableReferenceExpression tableReference);
 
+    public abstract R visit(ModelReferenceExpression modelReferenceExpression);

Review Comment:
   Add `else if ` branch in line 30?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -682,6 +686,29 @@ public Table fromCall(Class<? extends UserDefinedFunction> 
function, Object... a
                 operationTreeBuilder.tableFunction(Expressions.call(function, 
arguments)));
     }
 
+    @Override
+    public Model fromModelPath(String modelPath) {
+        UnresolvedIdentifier unresolvedIdentifier = 
getParser().parseIdentifier(modelPath);
+        ObjectIdentifier modelIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        return catalogManager
+                .getModel(modelIdentifier)
+                .map(this::createModel)
+                .orElseThrow(
+                        () ->
+                                new ValidationException(
+                                        String.format(
+                                                "Model %s was not found.", 
unresolvedIdentifier)));
+    }
+
+    @Override
+    public Model from(ModelDescriptor descriptor) {
+        Preconditions.checkNotNull(descriptor, "Model descriptor must not be 
null.");

Review Comment:
   nit: No need for the check.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ModelReferenceExpression.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Model;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.ContextResolvedModel;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A reference to a {@link Model} in an expression context.
+ *
+ * <p>This expression is used when a model needs to be passed as an argument 
to functions or
+ * operations that accept model references. It wraps a model object and 
provides the necessary
+ * expression interface for use in the Table API expression system.
+ *
+ * <p>The expression carries a string representation of the model and uses a 
special data type to
+ * indicate that this is a model reference rather than a regular data value.
+ */
+@Internal
+public final class ModelReferenceExpression implements ResolvedExpression {
+
+    private final String name;
+    private final ContextResolvedModel model;
+    private final TableEnvironment env;

Review Comment:
   Why do we need TableEnv here? 



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MLPredictTestPrograms.java:
##########
@@ -139,11 +159,114 @@ public class MLPredictTestPrograms {
             TableTestProgram.of(
                             "sync-ml-predict-with-runtime-options",
                             "ml-predict in sync mode with runtime config.")
-                    .setupTableSource(FEATURES_TABLE)
+                    .setupTableSource(RESTORE_FEATURES_TABLE)
                     .setupModel(ASYNC_MODEL)
-                    .setupTableSink(SINK_TABLE)
+                    .setupTableSink(RESTORE_SINK_TABLE)
                     .runSql(
                             "INSERT INTO sink_t SELECT * FROM ML_PREDICT(TABLE 
features, MODEL chatgpt, DESCRIPTOR(feature), MAP['async', 'false'])")
                     .build();
-    ;
+
+    public static final TableTestProgram SYNC_ML_PREDICT_TABLE_API =
+            TableTestProgram.of(
+                            "sync-ml-predict-table-api", "ml-predict in sync 
mode using Table API.")
+                    .setupTableSource(SIMPLE_FEATURES_SOURCE)
+                    .setupModel(SYNC_MODEL)
+                    .setupTableSink(SIMPLE_SINK)
+                    .runTableApi(
+                            env ->
+                                    env.fromCall(
+                                            "ML_PREDICT",
+                                            
env.from("features").asArgument("INPUT"),
+                                            
env.fromModel("chatgpt").asArgument("MODEL"),
+                                            
descriptor("feature").asArgument("ARGS")),
+                            "sink")
+                    .build();
+
+    public static final TableTestProgram ASYNC_ML_PREDICT_TABLE_API =
+            TableTestProgram.of(
+                            "async-ml-predict-table-api",
+                            "ml-predict in async mode using Table API.")
+                    .setupTableSource(SIMPLE_FEATURES_SOURCE)
+                    .setupModel(ASYNC_MODEL)
+                    .setupTableSink(SIMPLE_SINK)
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_ASYNC_ML_PREDICT_OUTPUT_MODE,
+                            
ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED)
+                    .runTableApi(
+                            env ->
+                                    env.fromCall(
+                                            "ML_PREDICT",
+                                            
env.from("features").asArgument("INPUT"),
+                                            
env.fromModel("chatgpt").asArgument("MODEL"),
+                                            
descriptor("feature").asArgument("ARGS"),
+                                            Expressions.lit(
+                                                            Map.of("async", 
"true"),
+                                                            DataTypes.MAP(
+                                                                            
DataTypes.STRING(),
+                                                                            
DataTypes.STRING())
+                                                                    .notNull())
+                                                    .asArgument("CONFIG")),
+                            "sink")
+                    .build();
+
+    public static final TableTestProgram 
ASYNC_ML_PREDICT_TABLE_API_MAP_EXPRESSION_CONFIG =
+            TableTestProgram.of(
+                            "async-ml-predict-table-api-map-expression-config",
+                            "ml-predict in async mode using Table API and map 
expression.")
+                    .setupTableSource(SIMPLE_FEATURES_SOURCE)
+                    .setupModel(ASYNC_MODEL)
+                    .setupTableSink(SIMPLE_SINK)
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_ASYNC_ML_PREDICT_OUTPUT_MODE,
+                            
ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED)
+                    .runTableApi(
+                            env ->
+                                    env.fromCall(
+                                            "ML_PREDICT",
+                                            
env.from("features").asArgument("INPUT"),
+                                            
env.fromModel("chatgpt").asArgument("MODEL"),
+                                            
descriptor("feature").asArgument("ARGS"),
+                                            Expressions.map(
+                                                            "async",
+                                                            "true",
+                                                            
"max-concurrent-operations",
+                                                            "10")
+                                                    .asArgument("CONFIG")),
+                            "sink")
+                    .build();
+
+    public static final TableTestProgram ML_PREDICT_MODEL_API =
+            TableTestProgram.of("ml-predict-model-api", "ml-predict using 
model API")
+                    .setupTableSource(SIMPLE_FEATURES_SOURCE)
+                    .setupModel(SYNC_MODEL)
+                    .setupTableSink(SIMPLE_SINK)
+                    .runTableApi(
+                            env ->
+                                    env.fromModel("chatgpt")
+                                            .predict(
+                                                    env.from("features"), 
ColumnList.of("feature")),
+                            "sink")
+                    .build();
+
+    public static final TableTestProgram ASYNC_ML_PREDICT_MODEL_API =
+            TableTestProgram.of("async-ml-predict-model-api", "async 
ml-predict using model API")
+                    .setupTableSource(SIMPLE_FEATURES_SOURCE)
+                    .setupModel(ASYNC_MODEL)
+                    .setupTableSink(SIMPLE_SINK)
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_ASYNC_ML_PREDICT_OUTPUT_MODE,
+                            
ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED)
+                    .runTableApi(
+                            env ->
+                                    env.fromModel("chatgpt")

Review Comment:
   No tests for anoymous model?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java:
##########
@@ -334,6 +343,68 @@ public RelNode visit(FunctionQueryOperation functionTable) 
{
                                                             new int[0]);
                                             inputStack.add(relBuilder.build());
                                             return tableArgCall;
+                                        } else if (resolvedArg

Review Comment:
   I don't think it's a good idea to use instance of here. It's better we reuse 
ExpressionConverter to convert these expression. How about let 
`ExpressionConverter`extends `ResolvedExpressionVisitor<RexNode>`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to