twalthr commented on code in PR #27108:
URL: https://github.com/apache/flink/pull/27108#discussion_r2503865587


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -1175,6 +1175,49 @@ void createTemporarySystemFunction(
      */
     Table fromCall(Class<? extends UserDefinedFunction> function, Object... 
arguments);
 
+    /**
+     * Returns a {@link Model} object that is backed by the specified model 
path.
+     *
+     * <p>This method creates a {@link Model} object from a given model path 
in the catalog. The
+     * model path can be fully or partially qualified (e.g., 
"catalog.db.model" or just "model"),
+     * depending on the current catalog and database context.
+     *
+     * <p>The returned {@link Model} object can be used for further 
transformations or as input to
+     * other operations in the Table API.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * Model model = tableEnv.fromModelPath("my_model");
+     * }</pre>
+     *
+     * @param modelPath The path of the model in the catalog.
+     * @return The {@link Model} object describing the model resource.
+     */
+    Model fromModelPath(String modelPath);
+
+    /**
+     * Returns a {@link Model} object that is backed by the specified {@link 
ModelDescriptor}.
+     *
+     * <p>This method creates a {@link Model} object using the provided {@link 
ModelDescriptor},
+     * which contains the necessary information to identify and configure the 
model resource in the
+     * catalog.
+     *
+     * <p>The returned {@link Model} object can be used for further 
transformations or as input to
+     * other operations in the Table API.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * ModelDescriptor descriptor = ...;
+     * Model model = tableEnv.from(descriptor);
+     * }</pre>
+     *
+     * @param descriptor The {@link ModelDescriptor} describing the model 
resource.
+     * @return The {@link Model} object representing the model resource.
+     */
+    Model from(ModelDescriptor descriptor);

Review Comment:
   Call this `fromModel` to distinguish between the heavily overloaded `from()` 
method. We should let `from()` be reserved for tables. 



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -1175,6 +1175,49 @@ void createTemporarySystemFunction(
      */
     Table fromCall(Class<? extends UserDefinedFunction> function, Object... 
arguments);
 
+    /**
+     * Returns a {@link Model} object that is backed by the specified model 
path.
+     *
+     * <p>This method creates a {@link Model} object from a given model path 
in the catalog. The
+     * model path can be fully or partially qualified (e.g., 
"catalog.db.model" or just "model"),
+     * depending on the current catalog and database context.
+     *
+     * <p>The returned {@link Model} object can be used for further 
transformations or as input to
+     * other operations in the Table API.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * Model model = tableEnv.fromModelPath("my_model");
+     * }</pre>
+     *
+     * @param modelPath The path of the model in the catalog.
+     * @return The {@link Model} object describing the model resource.
+     */
+    Model fromModelPath(String modelPath);

Review Comment:
   nit: I'm still wondering whether we should call this `fromModel`. It would 
align better with `fromCall`.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java:
##########
@@ -156,6 +156,19 @@ public static ApiExpression descriptor(String... 
columnNames) {
         return new 
ApiExpression(valueLiteral(ColumnList.of(Arrays.asList(columnNames))));
     }
 
+    /**
+     * Creates a literal describing an arbitrary, unvalidated list of column 
names.
+     *
+     * <p>Passing a column list can be useful for parameterizing a function. 
In particular, it
+     * enables declaring the {@code on_time} argument for {@link 
ProcessTableFunction} or the {@code
+     * inputColumns} for {@link Model#predict}.
+     *
+     * <p>The data type will be {@link DataTypes#DESCRIPTOR()}.
+     */
+    public static ApiExpression descriptor(ColumnList columnList) {

Review Comment:
   Is method is unnecessary. It is like "Descriptor in a descriptor". You can 
just directly pass ColumnList to any method that takes expressions.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ModelReferenceExpression.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Model;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ContextResolvedModel;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A reference to a {@link Model} in an expression context.
+ *
+ * <p>This expression is used when a model needs to be passed as an argument 
to functions or
+ * operations that accept model references. It wraps a model object and 
provides the necessary
+ * expression interface for use in the Table API expression system.
+ *
+ * <p>The expression carries a string representation of the model and uses a 
special data type to
+ * indicate that this is a model reference rather than a regular data value.
+ */
+@Internal
+public final class ModelReferenceExpression implements ResolvedExpression {
+
+    private final String name;
+    private final ContextResolvedModel model;
+    // The environment is optional but serves validation purposes
+    // to ensure that all referenced tables belong to the same
+    // environment.
+    private final TableEnvironment env;
+
+    public ModelReferenceExpression(String name, ContextResolvedModel model, 
TableEnvironment env) {
+        this.name = Preconditions.checkNotNull(name);
+        this.model = Preconditions.checkNotNull(model);
+        this.env = Preconditions.checkNotNull(env);
+    }
+
+    /**
+     * Returns the name of this model reference.
+     *
+     * @return the model reference name
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Returns the ContextResolvedModel associated with this model reference.
+     *
+     * @return the query context resolved model
+     */
+    public ContextResolvedModel getModel() {
+        return model;
+    }
+
+    public @Nullable TableEnvironment getTableEnvironment() {
+        return env;
+    }
+
+    /**
+     * Returns the input data type expected by this model reference.
+     *
+     * <p>This method extracts the input data type from the model's input 
schema, which describes
+     * the structure and data types that the model expects for inference 
operations.
+     *
+     * @return the input data type expected by the model
+     */
+    public DataType getInputDataType() {
+        return DataTypeUtils.fromResolvedSchemaPreservingTimeAttributes(

Review Comment:
   Time attributes play no role for models. we can remove the call here and 
below.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ModelImpl.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Model;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.ContextResolvedModel;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.types.ColumnList;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import static org.apache.flink.table.api.Expressions.lit;
+
+/** Implementation of {@link Model} that works with the Table API. */
+@Internal
+public class ModelImpl implements Model {
+
+    private final TableEnvironmentInternal tableEnvironment;
+    private final ContextResolvedModel model;
+
+    private ModelImpl(TableEnvironmentInternal tableEnvironment, 
ContextResolvedModel model) {
+        this.tableEnvironment = tableEnvironment;
+        this.model = model;
+    }
+
+    public static ModelImpl createModel(
+            TableEnvironmentInternal tableEnvironment, ContextResolvedModel 
model) {
+        return new ModelImpl(tableEnvironment, model);
+    }
+
+    public ContextResolvedModel getModel() {
+        return model;
+    }
+
+    @Override
+    public ResolvedSchema getResolvedInputSchema() {
+        return model.getResolvedModel().getResolvedInputSchema();
+    }
+
+    @Override
+    public ResolvedSchema getResolvedOutputSchema() {
+        return model.getResolvedModel().getResolvedOutputSchema();
+    }
+
+    public TableEnvironment getTableEnv() {
+        return tableEnvironment;
+    }
+
+    @Override
+    public Table predict(Table table, ColumnList inputColumns) {
+        return predict(table, inputColumns, Map.of());
+    }
+
+    @Override
+    public Table predict(Table table, ColumnList inputColumns, Map<String, 
String> options) {
+        // Use Expressions.map() instead of Expressions.lit() to create a MAP 
literal since
+        // lit() is not serializable to sql.
+        if (options.isEmpty()) {
+            return tableEnvironment.fromCall(
+                    BuiltInFunctionDefinitions.ML_PREDICT.getName(),

Review Comment:
   we don't need to call `getName` here, ideally we can just use 
`ApiExpressionUtils.unresolvedCall` and `operationTreeBuilder` and pass the 
FunctionDefinition. So one level deeper. this avoids a catalog lookup for the 
function name. catalog lookups are expensive.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java:
##########
@@ -256,12 +269,55 @@ public RexNode visit(Expression other) {
                     local.getName(),
                     typeFactory.createFieldTypeFromLogicalType(
                             
fromDataTypeToLogicalType(local.getOutputDataType())));
+        } else if (other instanceof ModelReferenceExpression) {
+            return visit((ModelReferenceExpression) other);
         } else {
             throw new UnsupportedOperationException(
                     other.getClass().getSimpleName() + ":" + other.toString());
         }
     }
 
+    public RexNode visit(ModelReferenceExpression modelRef) {
+        final ContextResolvedModel contextResolvedModel = modelRef.getModel();
+        final FlinkContext flinkContext = 
ShortcutUtils.unwrapContext(relBuilder);
+
+        final Optional<ModelProviderFactory> factoryFromCatalog =
+                contextResolvedModel
+                        .getCatalog()
+                        .flatMap(Catalog::getFactory)
+                        .map(
+                                f ->
+                                        f instanceof ModelProviderFactory
+                                                ? (ModelProviderFactory) f
+                                                : null);
+
+        final Optional<ModelProviderFactory> factoryFromModule =
+                
flinkContext.getModuleManager().getFactory(Module::getModelProviderFactory);
+
+        // Since the catalog is more specific, we give it
+        // precedence over a factory provided by any
+        // modules.
+        final ModelProviderFactory factory =
+                firstPresent(factoryFromCatalog, 
factoryFromModule).orElse(null);
+
+        final ModelProvider modelProvider =
+                FactoryUtil.createModelProvider(
+                        factory,
+                        contextResolvedModel.getIdentifier(),
+                        contextResolvedModel.getResolvedModel(),
+                        flinkContext.getTableConfig(),
+                        flinkContext.getClassLoader(),
+                        contextResolvedModel.isTemporary());
+        final LogicalType modelOutputType =
+                DataTypeUtils.fromResolvedSchemaPreservingTimeAttributes(

Review Comment:
   time attributes are not a topic for models



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/FailingTableApiTestStep.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.test.program.TableApiTestStep.TableEnvAccessor;
+import org.apache.flink.util.Preconditions;
+
+import java.util.function.Function;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Test step for executing Table API query that will fail eventually with 
either {@link
+ * ValidationException} (during planning time) or {@link 
TableRuntimeException} (during execution
+ * time).
+ *
+ * <p>Similar to {@link FailingSqlTestStep} but uses Table API instead of SQL.
+ */
+public final class FailingTableApiTestStep implements TestStep {
+
+    private final Function<TableEnvAccessor, Table> tableQuery;
+    private final String sinkName;
+    public final Class<? extends Exception> expectedException;
+    public final String expectedErrorMessage;
+
+    FailingTableApiTestStep(
+            Function<TableEnvAccessor, Table> tableQuery,
+            String sinkName,
+            Class<? extends Exception> expectedException,
+            String expectedErrorMessage) {
+        Preconditions.checkArgument(
+                expectedException == ValidationException.class
+                        || expectedException == TableRuntimeException.class,
+                "Usually a Table API query should fail with either validation 
or runtime exception. "
+                        + "Otherwise this might require an update to the 
exception design.");
+        this.tableQuery = tableQuery;
+        this.sinkName = sinkName;
+        this.expectedException = expectedException;
+        this.expectedErrorMessage = expectedErrorMessage;
+    }
+
+    @Override
+    public TestKind getKind() {
+        return TestKind.FAILING_TABLE_API;
+    }
+
+    public Table toTable(TableEnvironment env) {
+        return tableQuery.apply(
+                new TableEnvAccessor() {
+                    @Override
+                    public Table from(String path) {
+                        return env.from(path);
+                    }
+
+                    @Override
+                    public Table fromCall(String path, Object... arguments) {
+                        return env.fromCall(path, arguments);
+                    }
+
+                    @Override
+                    public Table fromCall(
+                            Class<? extends 
org.apache.flink.table.functions.UserDefinedFunction>
+                                    function,
+                            Object... arguments) {
+                        return env.fromCall(function, arguments);
+                    }
+
+                    @Override
+                    public Table fromValues(Object... values) {
+                        return env.fromValues(values);
+                    }
+
+                    @Override
+                    public Table fromValues(
+                            org.apache.flink.table.types.AbstractDataType<?> 
dataType,
+                            Object... values) {
+                        return env.fromValues(dataType, values);
+                    }
+
+                    @Override
+                    public Table sqlQuery(String query) {
+                        return env.sqlQuery(query);
+                    }
+
+                    @Override
+                    public org.apache.flink.table.api.Model fromModel(String 
modelPath) {

Review Comment:
   Resolve all imports in this class:
   ```suggestion
                       public Model fromModel(String modelPath) {
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FunctionCallUtil.java:
##########
@@ -35,20 +35,27 @@
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 import static org.apache.calcite.sql.SqlKind.MAP_VALUE_CONSTRUCTOR;
+import static 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType;
+import static 
org.apache.flink.table.types.logical.LogicalTypeFamily.CHARACTER_STRING;
 
 /** Common utils for function call, e.g. ML_PREDICT and Lookup Join. */
 public abstract class FunctionCallUtil {
 
+    private static final String CONFIG_ERROR_MESSAGE =
+            "Config parameter should be a MAP data type consisting String 
literals.";

Review Comment:
   ```suggestion
               "Config parameter should be a MAP data type consisting of string 
literals.";
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/MLPredictITCase.java:
##########
@@ -156,6 +159,23 @@ public void testPredictWithConstantValues() {
                 .containsExactlyInAnyOrder(Row.of(1L, "x1", 1, "z1"), 
Row.of(2L, "x2", 2, "z2"));
     }
 
+    @Test
+    public void testPredictTableApiWithView() {

Review Comment:
   Is this test still necessary after all semantic tests?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -682,6 +686,29 @@ public Table fromCall(Class<? extends UserDefinedFunction> 
function, Object... a
                 operationTreeBuilder.tableFunction(Expressions.call(function, 
arguments)));
     }
 
+    @Override
+    public Model fromModelPath(String modelPath) {
+        UnresolvedIdentifier unresolvedIdentifier = 
getParser().parseIdentifier(modelPath);
+        ObjectIdentifier modelIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        return catalogManager
+                .getModel(modelIdentifier)
+                .map(this::createModel)
+                .orElseThrow(
+                        () ->
+                                new ValidationException(

Review Comment:
   ModelNotExistException is for catalog API only



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java:
##########
@@ -1117,6 +1125,97 @@ private static Instant dayOfSeconds(int second) {
                             "sink")
                     .build();
 
+    public static final TableTestProgram ML_PREDICT_MODEL_API =
+            TableTestProgram.of("ml-predict-model-api", "ml-predict using 
model API")
+                    .setupTableSource(SIMPLE_FEATURES_SOURCE)
+                    .setupModel(SYNC_MODEL)
+                    .setupTableSink(SIMPLE_SINK)
+                    .runTableApi(
+                            env ->
+                                    env.fromModel("chatgpt")
+                                            .predict(
+                                                    env.from("features"), 
ColumnList.of("feature")),
+                            "sink")
+                    .build();
+
+    public static final TableTestProgram ASYNC_ML_PREDICT_MODEL_API =
+            TableTestProgram.of("async-ml-predict-model-api", "async 
ml-predict using model API")
+                    .setupTableSource(SIMPLE_FEATURES_SOURCE)
+                    .setupModel(ASYNC_MODEL)
+                    .setupTableSink(SIMPLE_SINK)
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_ASYNC_ML_PREDICT_OUTPUT_MODE,
+                            
ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED)

Review Comment:
   nit: simplify code
   ```suggestion
                               AsyncOutputMode.ALLOW_UNORDERED)
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ModelReferenceExpression.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Model;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ContextResolvedModel;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A reference to a {@link Model} in an expression context.
+ *
+ * <p>This expression is used when a model needs to be passed as an argument 
to functions or
+ * operations that accept model references. It wraps a model object and 
provides the necessary
+ * expression interface for use in the Table API expression system.
+ *
+ * <p>The expression carries a string representation of the model and uses a 
special data type to
+ * indicate that this is a model reference rather than a regular data value.
+ */
+@Internal
+public final class ModelReferenceExpression implements ResolvedExpression {
+
+    private final String name;
+    private final ContextResolvedModel model;
+    private final TableEnvironment env;
+
+    public ModelReferenceExpression(String name, ContextResolvedModel model, 
TableEnvironment env) {
+        this.name = Preconditions.checkNotNull(name);
+        this.model = Preconditions.checkNotNull(model);
+        this.env = Preconditions.checkNotNull(env);
+    }
+
+    /**
+     * Returns the name of this model reference.
+     *
+     * @return the model reference name
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Returns the ContextResolvedModel associated with this model reference.
+     *
+     * @return the query context resolved model
+     */
+    public ContextResolvedModel getModel() {
+        return model;
+    }
+
+    public @Nullable TableEnvironment getTableEnvironment() {
+        return env;
+    }
+
+    /**
+     * Returns the input data type expected by this model reference.
+     *
+     * <p>This method extracts the input data type from the model's input 
schema, which describes
+     * the structure and data types that the model expects for inference 
operations.
+     *
+     * @return the input data type expected by the model
+     */
+    public DataType getInputDataType() {
+        return DataTypeUtils.fromResolvedSchemaPreservingTimeAttributes(
+                model.getResolvedModel().getResolvedInputSchema());
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypeUtils.fromResolvedSchemaPreservingTimeAttributes(
+                model.getResolvedModel().getResolvedOutputSchema());
+    }
+
+    @Override
+    public List<ResolvedExpression> getResolvedChildren() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public String asSerializableString(SqlFactory sqlFactory) {
+        if (model.isAnonymous()) {
+            throw new ValidationException("Anonymous models cannot be 
serialized.");
+        }
+
+        return "MODEL " + model.getIdentifier().asSerializableString();

Review Comment:
   We should add a test to `QueryOperationSqlSerializationTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to