This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 16c2e46  [FLINK-22426][table] Fix several shortcomings that prevent 
schema expressions
16c2e46 is described below

commit 16c2e467b66d6d1c7b8b847e923c6823056181a2
Author: Timo Walther <twal...@apache.org>
AuthorDate: Fri Apr 23 14:09:13 2021 +0200

    [FLINK-22426][table] Fix several shortcomings that prevent schema 
expressions
    
    This fixes a couple of critical bugs in the stack that prevented Table API
    expressions to be used for schema declaration. Due to time constraints,
    this PR could not make it into the 1.13 release but should be added to
    the next bugfix release for a smoother user experience. For testing and
    consistency, it exposes the Table API expression sourceWatermark() in Java,
    Scala, and Python API.
    
    This closes #15798
---
 flink-python/pyflink/table/expressions.py          | 14 ++++
 .../org/apache/flink/table/api/Expressions.java    | 16 ++++
 .../table/catalog/QueryOperationCatalogView.java   | 53 ++++++++-----
 .../resolver/rules/ResolveCallByArgumentsRule.java | 33 ++++----
 .../flink/table/catalog/SchemaResolutionTest.java  | 89 +++++++++------------
 .../table/api/ImplicitExpressionConversions.scala  | 27 +++++--
 .../table/types/inference/TypeInferenceUtil.java   | 81 ++++++++-----------
 .../inference/InputTypeStrategiesTestBase.java     |  2 +-
 .../table/planner/catalog/CatalogSchemaTable.java  | 56 ++++---------
 .../table/planner/connectors/DynamicSinkUtils.java | 91 ++++++++++------------
 .../planner/connectors/DynamicSourceUtils.java     | 12 +--
 .../expressions/converter/ExpressionConverter.java | 15 +++-
 .../plan/nodes/exec/common/CommonExecSink.java     | 50 ++++++++----
 .../flink/table/planner/utils/ShortcutUtils.java   |  8 ++
 .../table/planner/calcite/FlinkTypeFactory.scala   | 16 ++++
 .../table/planner/delegation/PlannerBase.scala     | 10 +--
 .../plan/schema/LegacyCatalogSourceTable.scala     |  2 +-
 .../flink/table/planner/sinks/TableSinkUtils.scala | 11 ---
 .../runtime/stream/sql/DataStreamJavaITCase.java   | 17 ++--
 19 files changed, 331 insertions(+), 272 deletions(-)

diff --git a/flink-python/pyflink/table/expressions.py 
b/flink-python/pyflink/table/expressions.py
index 179fac4..0b3a60b 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -477,6 +477,20 @@ def log(v, base=None) -> Expression[float]:
         return _binary_op("log", base, v)
 
 
+def source_watermark() -> Expression:
+    """
+    Source watermark declaration for schema.
+
+    This is a marker function that doesn't have concrete runtime 
implementation. It can only
+    be used as a single expression for watermark strategies in schema 
declarations. The declaration
+    will be pushed down into a table source that implements the 
`SupportsSourceWatermark`
+    interface. The source will emit system-defined watermarks afterwards.
+
+    Please check the documentation whether the connector supports source 
watermarks.
+    """
+    return _leaf_op("sourceWatermark")
+
+
 def if_then_else(condition: Union[bool, Expression[bool]], if_true, if_false) 
-> Expression:
     """
     Ternary conditional operator that decides which of two other expressions 
should be evaluated
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 0444287..b88be0f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ResolvedExpression;
@@ -465,6 +466,21 @@ public final class Expressions {
     }
 
     /**
+     * Source watermark declaration for {@link Schema}.
+     *
+     * <p>This is a marker function that doesn't have concrete runtime 
implementation. It can only
+     * be used as a single expression in {@link 
Schema.Builder#watermark(String, Expression)}. The
+     * declaration will be pushed down into a table source that implements the 
{@link
+     * SupportsSourceWatermark} interface. The source will emit system-defined 
watermarks
+     * afterwards.
+     *
+     * <p>Please check the documentation whether the connector supports source 
watermarks.
+     */
+    public static ApiExpression sourceWatermark() {
+        return apiCall(BuiltInFunctionDefinitions.SOURCE_WATERMARK);
+    }
+
+    /**
      * Ternary conditional operator that decides which of two other 
expressions should be evaluated
      * based on a evaluated boolean condition.
      *
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java
index 5559813..dc4f0a7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java
@@ -19,31 +19,21 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.operations.QueryOperation;
 
-import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 
-/**
- * A view created from a {@link QueryOperation} via operations on {@link
- * org.apache.flink.table.api.Table}.
- */
+/** A view created from a {@link QueryOperation} via operations on {@link 
Table}. */
 @Internal
-public class QueryOperationCatalogView extends AbstractCatalogView {
+public final class QueryOperationCatalogView implements CatalogView {
+
     private final QueryOperation queryOperation;
 
     public QueryOperationCatalogView(QueryOperation queryOperation) {
-        this(queryOperation, "");
-    }
-
-    public QueryOperationCatalogView(QueryOperation queryOperation, String 
comment) {
-        super(
-                queryOperation.asSummaryString(),
-                queryOperation.asSummaryString(),
-                
TableSchema.fromResolvedSchema(queryOperation.getResolvedSchema()),
-                new HashMap<>(),
-                comment);
         this.queryOperation = queryOperation;
     }
 
@@ -52,8 +42,23 @@ public class QueryOperationCatalogView extends 
AbstractCatalogView {
     }
 
     @Override
+    public Schema getUnresolvedSchema() {
+        return 
Schema.newBuilder().fromResolvedSchema(queryOperation.getResolvedSchema()).build();
+    }
+
+    @Override
+    public Map<String, String> getOptions() {
+        throw new TableException("A view backed by a query operation has no 
options.");
+    }
+
+    @Override
+    public String getComment() {
+        return queryOperation.asSummaryString();
+    }
+
+    @Override
     public QueryOperationCatalogView copy() {
-        return new QueryOperationCatalogView(this.queryOperation, 
getComment());
+        return new QueryOperationCatalogView(queryOperation);
     }
 
     @Override
@@ -65,4 +70,16 @@ public class QueryOperationCatalogView extends 
AbstractCatalogView {
     public Optional<String> getDetailedDescription() {
         return getDescription();
     }
+
+    @Override
+    public String getOriginalQuery() {
+        throw new TableException(
+                "A view backed by a query operation has no serializable 
representation.");
+    }
+
+    @Override
+    public String getExpandedQuery() {
+        throw new TableException(
+                "A view backed by a query operation has no serializable 
representation.");
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index d29e31e..9a7bca6 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -81,8 +81,11 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
 
     @Override
     public List<Expression> apply(List<Expression> expression, 
ResolutionContext context) {
+        // only the top-level expressions may access the output data type
+        final SurroundingInfo surroundingInfo =
+                
context.getOutputDataType().map(SurroundingInfo::of).orElse(null);
         return expression.stream()
-                .flatMap(expr -> expr.accept(new ResolvingCallVisitor(context, 
null)).stream())
+                .flatMap(e -> e.accept(new ResolvingCallVisitor(context, 
surroundingInfo)).stream())
                 .collect(Collectors.toList());
     }
 
@@ -120,23 +123,23 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
             // resolve the children with information from the current call
             final List<ResolvedExpression> resolvedArgs = new ArrayList<>();
             final int argCount = unresolvedCall.getChildren().size();
+
             for (int i = 0; i < argCount; i++) {
                 final int currentPos = i;
+                final SurroundingInfo surroundingInfo =
+                        typeInference
+                                .map(
+                                        inference ->
+                                                SurroundingInfo.of(
+                                                        name,
+                                                        definition,
+                                                        inference,
+                                                        argCount,
+                                                        currentPos,
+                                                        
resolutionContext.isGroupedAggregation()))
+                                .orElse(null);
                 final ResolvingCallVisitor childResolver =
-                        new ResolvingCallVisitor(
-                                resolutionContext,
-                                typeInference
-                                        .map(
-                                                inference ->
-                                                        new SurroundingInfo(
-                                                                name,
-                                                                definition,
-                                                                inference,
-                                                                argCount,
-                                                                currentPos,
-                                                                
resolutionContext
-                                                                        
.isGroupedAggregation()))
-                                        .orElse(null));
+                        new ResolvingCallVisitor(resolutionContext, 
surroundingInfo);
                 
resolvedArgs.addAll(unresolvedCall.getChildren().get(i).accept(childResolver));
             }
 
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
index 6e0edd6..316c24a 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
@@ -21,8 +21,11 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -39,6 +42,7 @@ import java.util.Arrays;
 import java.util.Collections;
 
 import static org.apache.flink.table.api.Expressions.callSql;
+import static org.apache.flink.table.api.Expressions.sourceWatermark;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isTimeAttribute;
@@ -90,23 +94,21 @@ public class SchemaResolutionTest {
 
     // the type of ts_ltz is TIMESTAMP_LTZ
     private static final String COMPUTED_SQL_WITH_TS_LTZ = "ts_ltz - INTERVAL 
'60' MINUTE";
+
     private static final ResolvedExpression 
COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ =
             new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ(3), () -> 
COMPUTED_SQL_WITH_TS_LTZ);
+
     private static final String WATERMARK_SQL_WITH_TS_LTZ = "ts1 - INTERVAL 
'5' SECOND";
+
     private static final ResolvedExpression WATERMARK_RESOLVED_WITH_TS_LTZ =
             new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ(3), () -> 
WATERMARK_SQL_WITH_TS_LTZ);
+
     private static final Schema SCHEMA_WITH_TS_LTZ =
             Schema.newBuilder()
-                    .primaryKeyNamed("primary_constraint", "id") // out of 
order
                     .column("id", DataTypes.INT().notNull())
-                    .column("counter", DataTypes.INT().notNull())
-                    .column("payload", "ROW<name STRING, age INT, flag 
BOOLEAN>")
-                    .columnByMetadata("topic", DataTypes.STRING(), true)
-                    .columnByExpression(
-                            "ts1", callSql(COMPUTED_SQL_WITH_TS_LTZ)) // out 
of order API expression
+                    .columnByExpression("ts1", 
callSql(COMPUTED_SQL_WITH_TS_LTZ))
                     .columnByMetadata("ts_ltz", DataTypes.TIMESTAMP_LTZ(3), 
"timestamp")
                     .watermark("ts1", WATERMARK_SQL_WITH_TS_LTZ)
-                    .columnByExpression("proctime", PROCTIME_SQL)
                     .build();
 
     @Test
@@ -152,39 +154,54 @@ public class SchemaResolutionTest {
                 new ResolvedSchema(
                         Arrays.asList(
                                 Column.physical("id", 
DataTypes.INT().notNull()),
-                                Column.physical("counter", 
DataTypes.INT().notNull()),
-                                Column.physical(
-                                        "payload",
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("name", 
DataTypes.STRING()),
-                                                DataTypes.FIELD("age", 
DataTypes.INT()),
-                                                DataTypes.FIELD("flag", 
DataTypes.BOOLEAN()))),
-                                Column.metadata("topic", DataTypes.STRING(), 
null, true),
                                 Column.computed("ts1", 
COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ),
                                 Column.metadata(
-                                        "ts_ltz", DataTypes.TIMESTAMP_LTZ(3), 
"timestamp", false),
-                                Column.computed("proctime", 
PROCTIME_RESOLVED)),
+                                        "ts_ltz", DataTypes.TIMESTAMP_LTZ(3), 
"timestamp", false)),
                         Collections.singletonList(
                                 WatermarkSpec.of("ts1", 
WATERMARK_RESOLVED_WITH_TS_LTZ)),
-                        UniqueConstraint.primaryKey(
-                                "primary_constraint", 
Collections.singletonList("id")));
+                        null);
 
         final ResolvedSchema actualStreamSchema = 
resolveSchema(SCHEMA_WITH_TS_LTZ, true);
         {
             assertThat(actualStreamSchema, equalTo(expectedSchema));
             assertTrue(isRowtimeAttribute(getType(actualStreamSchema, "ts1")));
-            assertTrue(isProctimeAttribute(getType(actualStreamSchema, 
"proctime")));
         }
 
         final ResolvedSchema actualBatchSchema = 
resolveSchema(SCHEMA_WITH_TS_LTZ, false);
         {
             assertThat(actualBatchSchema, equalTo(expectedSchema));
             assertFalse(isRowtimeAttribute(getType(actualBatchSchema, "ts1")));
-            assertTrue(isProctimeAttribute(getType(actualBatchSchema, 
"proctime")));
         }
     }
 
     @Test
+    public void testSchemaResolutionWithSourceWatermark() {
+        final ResolvedSchema expectedSchema =
+                new ResolvedSchema(
+                        Collections.singletonList(
+                                Column.physical("ts_ltz", 
DataTypes.TIMESTAMP_LTZ(1))),
+                        Collections.singletonList(
+                                WatermarkSpec.of(
+                                        "ts_ltz",
+                                        new CallExpression(
+                                                FunctionIdentifier.of(
+                                                        
BuiltInFunctionDefinitions.SOURCE_WATERMARK
+                                                                .getName()),
+                                                
BuiltInFunctionDefinitions.SOURCE_WATERMARK,
+                                                Collections.emptyList(),
+                                                DataTypes.TIMESTAMP_LTZ(1)))),
+                        null);
+        final ResolvedSchema resolvedSchema =
+                resolveSchema(
+                        Schema.newBuilder()
+                                .column("ts_ltz", DataTypes.TIMESTAMP_LTZ(1))
+                                .watermark("ts_ltz", sourceWatermark())
+                                .build());
+
+        assertThat(resolvedSchema, equalTo(expectedSchema));
+    }
+
+    @Test
     public void testSchemaResolutionErrors() {
 
         // columns
@@ -282,20 +299,6 @@ public class SchemaResolutionTest {
                                 + "  WATERMARK FOR `ts` AS [ts - INTERVAL '5' 
SECOND],\n"
                                 + "  CONSTRAINT `primary_constraint` PRIMARY 
KEY (`id`) NOT ENFORCED\n"
                                 + ")"));
-        assertThat(
-                SCHEMA_WITH_TS_LTZ.toString(),
-                equalTo(
-                        "(\n"
-                                + "  `id` INT NOT NULL,\n"
-                                + "  `counter` INT NOT NULL,\n"
-                                + "  `payload` [ROW<name STRING, age INT, flag 
BOOLEAN>],\n"
-                                + "  `topic` METADATA VIRTUAL,\n"
-                                + "  `ts1` AS [ts_ltz - INTERVAL '60' 
MINUTE],\n"
-                                + "  `ts_ltz` METADATA FROM 'timestamp',\n"
-                                + "  `proctime` AS [PROCTIME()],\n"
-                                + "  WATERMARK FOR `ts1` AS [ts1 - INTERVAL 
'5' SECOND],\n"
-                                + "  CONSTRAINT `primary_constraint` PRIMARY 
KEY (`id`) NOT ENFORCED\n"
-                                + ")"));
     }
 
     @Test
@@ -315,22 +318,6 @@ public class SchemaResolutionTest {
                                 + "  WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - 
INTERVAL '5' SECOND,\n"
                                 + "  CONSTRAINT `primary_constraint` PRIMARY 
KEY (`id`) NOT ENFORCED\n"
                                 + ")"));
-
-        final ResolvedSchema resolvedSchemaWithTsLtz = 
resolveSchema(SCHEMA_WITH_TS_LTZ);
-        assertThat(
-                resolvedSchemaWithTsLtz.toString(),
-                equalTo(
-                        "(\n"
-                                + "  `id` INT NOT NULL,\n"
-                                + "  `counter` INT NOT NULL,\n"
-                                + "  `payload` ROW<`name` STRING, `age` INT, 
`flag` BOOLEAN>,\n"
-                                + "  `topic` STRING METADATA VIRTUAL,\n"
-                                + "  `ts1` TIMESTAMP_LTZ(3) *ROWTIME* AS 
ts_ltz - INTERVAL '60' MINUTE,\n"
-                                + "  `ts_ltz` TIMESTAMP_LTZ(3) METADATA FROM 
'timestamp',\n"
-                                + "  `proctime` TIMESTAMP_LTZ(3) NOT NULL 
*PROCTIME* AS PROCTIME(),\n"
-                                + "  WATERMARK FOR `ts1`: TIMESTAMP_LTZ(3) AS 
ts1 - INTERVAL '5' SECOND,\n"
-                                + "  CONSTRAINT `primary_constraint` PRIMARY 
KEY (`id`) NOT ENFORCED\n"
-                                + ")"));
     }
 
     @Test
diff --git 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index 552ca9d..dd655b0 100644
--- 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -20,10 +20,11 @@ package org.apache.flink.table.api
 
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import 
org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark
 import org.apache.flink.table.expressions.ApiExpressionUtils.{unresolvedCall, 
unresolvedRef, valueLiteral}
 import org.apache.flink.table.expressions.{ApiExpressionUtils, Expression, 
TableSymbol, TimePointUnit}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{DISTINCT, 
RANGE_TO}
-import org.apache.flink.table.functions.{ScalarFunction, TableFunction, 
ImperativeAggregateFunction, UserDefinedFunctionHelper, _}
+import org.apache.flink.table.functions.{ImperativeAggregateFunction, 
ScalarFunction, TableFunction, UserDefinedFunctionHelper, _}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
 
@@ -464,12 +465,9 @@ trait ImplicitExpressionConversions {
   /**
    * Converts a numeric type epoch time to [[DataTypes#TIMESTAMP_LTZ]].
    *
-   * <p>The supported precision is 0 or 3:
-   *
-   * <ul>
-   *   <li>0 means the numericEpochTime is in second.
-   *   <li>3 means the numericEpochTime is in millisecond.
-   * </ul>
+   * The supported precision is 0 or 3:
+   *   - 0 means the numericEpochTime is in second.
+   *   - 3 means the numericEpochTime is in millisecond.
    */
   def toTimestampLtz(numericEpochTime: Expression, precision: Expression): 
Expression = {
     Expressions.toTimestampLtz(numericEpochTime, precision)
@@ -682,6 +680,21 @@ trait ImplicitExpressionConversions {
   }
 
   /**
+   * Source watermark declaration for [[Schema]].
+   *
+   * This is a marker function that doesn't have concrete runtime 
implementation.
+   * It can only be used as a single expression in 
[[Schema.Builder#watermark(String, Expression)]].
+   * The declaration will be pushed down into a table source that implements 
the
+   * [[SupportsSourceWatermark]] interface. The source will emit 
system-defined watermarks
+   * afterwards.
+   *
+   * Please check the documentation whether the connector supports source 
watermarks.
+   */
+  def sourceWatermark(): Expression = {
+    Expressions.sourceWatermark()
+  }
+
+  /**
     * Ternary conditional operator that decides which of two other expressions 
should be evaluated
     * based on a evaluated boolean condition.
     *
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
index 1a045ae..6e36e31 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
@@ -216,63 +216,50 @@ public final class TypeInferenceUtil {
      *
      * @see CallContext#getOutputDataType()
      */
-    public static final class SurroundingInfo {
+    public interface SurroundingInfo {
 
-        private final String name;
-
-        private final FunctionDefinition functionDefinition;
-
-        private final TypeInference typeInference;
-
-        private final int argumentCount;
-
-        private final int innerCallPosition;
-
-        private final boolean isGroupedAggregation;
-
-        public SurroundingInfo(
+        static SurroundingInfo of(
                 String name,
                 FunctionDefinition functionDefinition,
                 TypeInference typeInference,
                 int argumentCount,
                 int innerCallPosition,
                 boolean isGroupedAggregation) {
-            this.name = name;
-            this.functionDefinition = functionDefinition;
-            this.typeInference = typeInference;
-            this.argumentCount = argumentCount;
-            this.innerCallPosition = innerCallPosition;
-            this.isGroupedAggregation = isGroupedAggregation;
+            return typeFactory -> {
+                final boolean isValidCount =
+                        validateArgumentCount(
+                                
typeInference.getInputTypeStrategy().getArgumentCount(),
+                                argumentCount,
+                                false);
+                if (!isValidCount) {
+                    return Optional.empty();
+                }
+                // for "takes_string(this_function(NULL))" simulate 
"takes_string(NULL)"
+                // for retrieving the output type of "this_function(NULL)"
+                final CallContext callContext =
+                        new UnknownCallContext(
+                                typeFactory,
+                                name,
+                                functionDefinition,
+                                argumentCount,
+                                isGroupedAggregation);
+
+                // We might not be able to infer the input types at this 
moment, if the surrounding
+                // function does not provide an explicit input type strategy.
+                final CallContext adaptedContext =
+                        adaptArguments(typeInference, callContext, null, 
false);
+                return typeInference
+                        .getInputTypeStrategy()
+                        .inferInputTypes(adaptedContext, false)
+                        .map(dataTypes -> dataTypes.get(innerCallPosition));
+            };
         }
 
-        private Optional<DataType> inferOutputType(DataTypeFactory 
typeFactory) {
-            final boolean isValidCount =
-                    validateArgumentCount(
-                            
typeInference.getInputTypeStrategy().getArgumentCount(),
-                            argumentCount,
-                            false);
-            if (!isValidCount) {
-                return Optional.empty();
-            }
-            // for "takes_string(this_function(NULL))" simulate 
"takes_string(NULL)"
-            // for retrieving the output type of "this_function(NULL)"
-            final CallContext callContext =
-                    new UnknownCallContext(
-                            typeFactory,
-                            name,
-                            functionDefinition,
-                            argumentCount,
-                            isGroupedAggregation);
-
-            // We might not be able to infer the input types at this moment, 
if the surrounding
-            // function does not provide an explicit input type strategy.
-            final CallContext adaptedContext =
-                    adaptArguments(typeInference, callContext, null, false);
-            return typeInference
-                    .getInputTypeStrategy()
-                    .inferInputTypes(adaptedContext, false)
-                    .map(dataTypes -> dataTypes.get(innerCallPosition));
+        static SurroundingInfo of(DataType dataType) {
+            return typeFactory -> Optional.of(dataType);
         }
+
+        Optional<DataType> inferOutputType(DataTypeFactory typeFactory);
     }
 
     /**
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
index 8e9f66d..52786ef 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
@@ -117,7 +117,7 @@ public abstract class InputTypeStrategiesTestBase {
                             .outputTypeStrategy(TypeStrategies.MISSING)
                             .build();
             surroundingInfo =
-                    new TypeInferenceUtil.SurroundingInfo(
+                    TypeInferenceUtil.SurroundingInfo.of(
                             "f_outer",
                             functionDefinitionMock,
                             outerTypeInference,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
index 08d9959..c5ea4ab 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
@@ -20,30 +20,29 @@ package org.apache.flink.table.planner.catalog;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogManager.TableLookupResult;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 import org.apache.flink.table.planner.sources.TableSourceUtil;
+import org.apache.flink.table.runtime.types.PlannerTypeUtils;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceValidation;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.TimestampType;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -52,9 +51,7 @@ import org.apache.calcite.schema.impl.AbstractTable;
 
 import java.util.List;
 import java.util.Optional;
-
-import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
-import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isTimeAttribute;
+import java.util.stream.Collectors;
 
 /**
  * Represents a wrapper for {@link CatalogBaseTable} in {@link 
org.apache.calcite.schema.Schema}.
@@ -124,34 +121,7 @@ public class CatalogSchemaTable extends AbstractTable 
implements TemporalTable {
     @Override
     public RelDataType getRowType(RelDataTypeFactory typeFactory) {
         final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) 
typeFactory;
-        TableSchema tableSchema = 
TableSchema.fromResolvedSchema(lookupResult.getResolvedSchema());
-        final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
-        CatalogBaseTable catalogTable = lookupResult.getTable();
-        if (!isStreamingMode
-                && catalogTable instanceof ConnectorCatalogTable
-                && ((ConnectorCatalogTable<?, ?>) 
catalogTable).getTableSource().isPresent()) {
-            // If the table source is bounded, materialize the time attributes 
to normal TIMESTAMP
-            // type.
-            // Now for ConnectorCatalogTable, there is no way to
-            // deduce if it is bounded in the table environment, so the data 
types in TableSchema
-            // always patched with TimeAttribute.
-            // See ConnectorCatalogTable#calculateSourceSchema
-            // for details.
-
-            // Remove the patched time attributes type to let the 
TableSourceTable handle it.
-            // We should remove this logic if the isBatch flag in 
ConnectorCatalogTable is fixed.
-            // TODO: Fix FLINK-14844.
-            for (int i = 0; i < fieldDataTypes.length; i++) {
-                LogicalType lt = fieldDataTypes[i].getLogicalType();
-                if (lt instanceof TimestampType && isRowtimeAttribute(lt)) {
-                    int precision = ((TimestampType) lt).getPrecision();
-                    fieldDataTypes[i] = DataTypes.TIMESTAMP(precision);
-                } else if (lt instanceof LocalZonedTimestampType && 
isTimeAttribute(lt)) {
-                    int precision = ((LocalZonedTimestampType) 
lt).getPrecision();
-                    fieldDataTypes[i] = DataTypes.TIMESTAMP_LTZ(precision);
-                }
-            }
-        }
+        final ResolvedSchema schema = lookupResult.getResolvedSchema();
 
         // The following block is a workaround to support tables defined by
         // TableEnvironment.connect() and
@@ -159,9 +129,10 @@ public class CatalogSchemaTable extends AbstractTable 
implements TemporalTable {
         // It should be removed after we remove 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
         Optional<TableSource<?>> sourceOpt = findAndCreateTableSource();
         if (isStreamingMode
-                && 
tableSchema.getTableColumns().stream().allMatch(TableColumn::isPhysical)
-                && tableSchema.getWatermarkSpecs().isEmpty()
+                && schema.getColumns().stream().allMatch(Column::isPhysical)
+                && schema.getWatermarkSpecs().isEmpty()
                 && sourceOpt.isPresent()) {
+            TableSchema tableSchema = TableSchema.fromResolvedSchema(schema);
             TableSource<?> source = sourceOpt.get();
             if (TableSourceValidation.hasProctimeAttribute(source)
                     || TableSourceValidation.hasRowtimeAttribute(source)) {
@@ -171,10 +142,17 @@ public class CatalogSchemaTable extends AbstractTable 
implements TemporalTable {
                 // ConnectorCatalogTable#calculateSourceSchema
                 tableSchema = 
ConnectorCatalogTable.calculateSourceSchema(source, false);
             }
+            return TableSourceUtil.getSourceRowType(
+                    flinkTypeFactory, tableSchema, scala.Option.empty(), true);
         }
 
-        return TableSourceUtil.getSourceRowType(
-                flinkTypeFactory, tableSchema, scala.Option.empty(), 
isStreamingMode);
+        final List<String> fieldNames = schema.getColumnNames();
+        final List<LogicalType> fieldTypes =
+                schema.getColumnDataTypes().stream()
+                        .map(DataType::getLogicalType)
+                        .map(PlannerTypeUtils::removeLegacyTypes)
+                        .collect(Collectors.toList());
+        return flinkTypeFactory.buildRelNodeRowType(fieldNames, fieldTypes);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index b732265..6222b23 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -20,13 +20,12 @@ package org.apache.flink.table.planner.connectors;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableColumn.MetadataColumn;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.Column.MetadataColumn;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -184,7 +183,7 @@ public final class DynamicSinkUtils {
         final DataTypeFactory dataTypeFactory =
                 
unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
         final FlinkTypeFactory typeFactory = unwrapTypeFactory(relBuilder);
-        final TableSchema schema = table.getSchema();
+        final ResolvedSchema schema = table.getResolvedSchema();
 
         List<SinkAbilitySpec> sinkAbilitySpecs = new ArrayList<>();
 
@@ -230,7 +229,7 @@ public final class DynamicSinkUtils {
      */
     public static RelNode validateSchemaAndApplyImplicitCast(
             RelNode query,
-            TableSchema sinkSchema,
+            ResolvedSchema sinkSchema,
             @Nullable ObjectIdentifier sinkIdentifier,
             DataTypeFactory dataTypeFactory,
             FlinkTypeFactory typeFactory) {
@@ -239,7 +238,7 @@ public final class DynamicSinkUtils {
 
         final RowType sinkType =
                 (RowType)
-                        fixSinkDataType(dataTypeFactory, 
sinkSchema.toPersistedRowDataType())
+                        fixSinkDataType(dataTypeFactory, 
sinkSchema.toSinkRowDataType())
                                 .getLogicalType();
         final List<RowField> sinkFields = sinkType.getFields();
 
@@ -284,10 +283,10 @@ public final class DynamicSinkUtils {
     private static void pushMetadataProjection(
             FlinkRelBuilder relBuilder,
             FlinkTypeFactory typeFactory,
-            TableSchema schema,
+            ResolvedSchema schema,
             DynamicTableSink sink) {
         final RexBuilder rexBuilder = relBuilder.getRexBuilder();
-        final List<TableColumn> tableColumns = schema.getTableColumns();
+        final List<Column> columns = schema.getColumns();
 
         final List<Integer> physicalColumns = extractPhysicalColumns(schema);
 
@@ -297,9 +296,9 @@ public final class DynamicSinkUtils {
                                 Collectors.toMap(
                                         pos -> {
                                             final MetadataColumn 
metadataColumn =
-                                                    (MetadataColumn) 
tableColumns.get(pos);
+                                                    (MetadataColumn) 
columns.get(pos);
                                             return metadataColumn
-                                                    .getMetadataAlias()
+                                                    .getMetadataKey()
                                                     
.orElse(metadataColumn.getName());
                                         },
                                         Function.identity()));
@@ -311,13 +310,11 @@ public final class DynamicSinkUtils {
 
         final List<String> fieldNames =
                 Stream.concat(
-                                physicalColumns.stream()
-                                        .map(tableColumns::get)
-                                        .map(TableColumn::getName),
+                                
physicalColumns.stream().map(columns::get).map(Column::getName),
                                 metadataColumns.stream()
-                                        .map(tableColumns::get)
+                                        .map(columns::get)
                                         .map(MetadataColumn.class::cast)
-                                        .map(c -> 
c.getMetadataAlias().orElse(c.getName())))
+                                        .map(c -> 
c.getMetadataKey().orElse(c.getName())))
                         .collect(Collectors.toList());
 
         final Map<String, DataType> metadataMap = extractMetadataMap(sink);
@@ -328,18 +325,17 @@ public final class DynamicSinkUtils {
                                         .map(
                                                 pos -> {
                                                     final int posAdjusted =
-                                                            
adjustByVirtualColumns(
-                                                                    
tableColumns, pos);
+                                                            
adjustByVirtualColumns(columns, pos);
                                                     return 
relBuilder.field(posAdjusted);
                                                 }),
                                 metadataColumns.stream()
                                         .map(
                                                 pos -> {
                                                     final MetadataColumn 
metadataColumn =
-                                                            (MetadataColumn) 
tableColumns.get(pos);
+                                                            (MetadataColumn) 
columns.get(pos);
                                                     final String metadataKey =
                                                             metadataColumn
-                                                                    
.getMetadataAlias()
+                                                                    
.getMetadataKey()
                                                                     .orElse(
                                                                             
metadataColumn
                                                                                
     .getName());
@@ -354,8 +350,7 @@ public final class DynamicSinkUtils {
                                                                             
expectedType);
 
                                                     final int posAdjusted =
-                                                            
adjustByVirtualColumns(
-                                                                    
tableColumns, pos);
+                                                            
adjustByVirtualColumns(columns, pos);
                                                     return 
rexBuilder.makeAbstractCast(
                                                             
expectedRelDataType,
                                                             
relBuilder.field(posAdjusted));
@@ -374,13 +369,13 @@ public final class DynamicSinkUtils {
             Map<String, String> staticPartitions,
             boolean isOverwrite,
             DynamicTableSink sink,
-            CatalogTable table,
+            ResolvedCatalogTable table,
             List<SinkAbilitySpec> sinkAbilitySpecs) {
         validatePartitioning(sinkIdentifier, staticPartitions, sink, 
table.getPartitionKeys());
 
         validateAndApplyOverwrite(sinkIdentifier, isOverwrite, sink, 
sinkAbilitySpecs);
 
-        validateAndApplyMetadata(sinkIdentifier, sink, table.getSchema(), 
sinkAbilitySpecs);
+        validateAndApplyMetadata(sinkIdentifier, sink, 
table.getResolvedSchema(), sinkAbilitySpecs);
     }
 
     /**
@@ -391,15 +386,15 @@ public final class DynamicSinkUtils {
      * #prepareDynamicSink}.
      */
     private static List<String> createRequiredMetadataKeys(
-            TableSchema schema, DynamicTableSink sink) {
-        final List<TableColumn> tableColumns = schema.getTableColumns();
+            ResolvedSchema schema, DynamicTableSink sink) {
+        final List<Column> tableColumns = schema.getColumns();
         final List<Integer> metadataColumns = 
extractPersistedMetadataColumns(schema);
 
         final Set<String> requiredMetadataKeys =
                 metadataColumns.stream()
                         .map(tableColumns::get)
                         .map(MetadataColumn.class::cast)
-                        .map(c -> c.getMetadataAlias().orElse(c.getName()))
+                        .map(c -> c.getMetadataKey().orElse(c.getName()))
                         .collect(Collectors.toSet());
 
         final Map<String, DataType> metadataMap = extractMetadataMap(sink);
@@ -506,33 +501,29 @@ public final class DynamicSinkUtils {
         sinkAbilitySpecs.add(new OverwriteSpec(true));
     }
 
-    private static List<Integer> extractPhysicalColumns(TableSchema schema) {
-        final List<TableColumn> tableColumns = schema.getTableColumns();
-        return IntStream.range(0, schema.getFieldCount())
-                .filter(pos -> tableColumns.get(pos).isPhysical())
+    private static List<Integer> extractPhysicalColumns(ResolvedSchema schema) 
{
+        final List<Column> columns = schema.getColumns();
+        return IntStream.range(0, schema.getColumnCount())
+                .filter(pos -> columns.get(pos).isPhysical())
                 .boxed()
                 .collect(Collectors.toList());
     }
 
-    private static List<Integer> extractPersistedMetadataColumns(TableSchema 
schema) {
-        final List<TableColumn> tableColumns = schema.getTableColumns();
-        return IntStream.range(0, schema.getFieldCount())
+    private static List<Integer> 
extractPersistedMetadataColumns(ResolvedSchema schema) {
+        final List<Column> columns = schema.getColumns();
+        return IntStream.range(0, schema.getColumnCount())
                 .filter(
                         pos -> {
-                            final TableColumn tableColumn = 
tableColumns.get(pos);
-                            return tableColumn instanceof MetadataColumn
-                                    && tableColumn.isPersisted();
+                            final Column column = columns.get(pos);
+                            return column instanceof MetadataColumn && 
column.isPersisted();
                         })
                 .boxed()
                 .collect(Collectors.toList());
     }
 
-    private static int adjustByVirtualColumns(List<TableColumn> tableColumns, 
int pos) {
+    private static int adjustByVirtualColumns(List<Column> columns, int pos) {
         return pos
-                - (int)
-                        IntStream.range(0, pos)
-                                .filter(i -> 
!tableColumns.get(i).isPersisted())
-                                .count();
+                - (int) IntStream.range(0, pos).filter(i -> 
!columns.get(i).isPersisted()).count();
     }
 
     private static Map<String, DataType> extractMetadataMap(DynamicTableSink 
sink) {
@@ -545,9 +536,9 @@ public final class DynamicSinkUtils {
     private static void validateAndApplyMetadata(
             ObjectIdentifier sinkIdentifier,
             DynamicTableSink sink,
-            TableSchema schema,
+            ResolvedSchema schema,
             List<SinkAbilitySpec> sinkAbilitySpecs) {
-        final List<TableColumn> tableColumns = schema.getTableColumns();
+        final List<Column> columns = schema.getColumns();
         final List<Integer> metadataColumns = 
extractPersistedMetadataColumns(schema);
 
         if (metadataColumns.isEmpty()) {
@@ -569,10 +560,10 @@ public final class DynamicSinkUtils {
                 ((SupportsWritingMetadata) sink).listWritableMetadata();
         metadataColumns.forEach(
                 pos -> {
-                    final MetadataColumn metadataColumn = (MetadataColumn) 
tableColumns.get(pos);
+                    final MetadataColumn metadataColumn = (MetadataColumn) 
columns.get(pos);
                     final String metadataKey =
-                            
metadataColumn.getMetadataAlias().orElse(metadataColumn.getName());
-                    final LogicalType metadataType = 
metadataColumn.getType().getLogicalType();
+                            
metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
+                    final LogicalType metadataType = 
metadataColumn.getDataType().getLogicalType();
                     final DataType expectedMetadataDataType = 
metadataMap.get(metadataKey);
                     // check that metadata key is valid
                     if (expectedMetadataDataType == null) {
@@ -626,13 +617,13 @@ public final class DynamicSinkUtils {
      *
      * <p>The format looks as follows: {@code PHYSICAL COLUMNS + PERSISTED 
METADATA COLUMNS}
      */
-    private static RowType createConsumedType(TableSchema schema, 
DynamicTableSink sink) {
+    private static RowType createConsumedType(ResolvedSchema schema, 
DynamicTableSink sink) {
         final Map<String, DataType> metadataMap = extractMetadataMap(sink);
 
         final Stream<RowField> physicalFields =
-                schema.getTableColumns().stream()
-                        .filter(TableColumn::isPhysical)
-                        .map(c -> new RowField(c.getName(), 
c.getType().getLogicalType()));
+                schema.getColumns().stream()
+                        .filter(Column::isPhysical)
+                        .map(c -> new RowField(c.getName(), 
c.getDataType().getLogicalType()));
 
         final Stream<RowField> metadataFields =
                 createRequiredMetadataKeys(schema, sink).stream()
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index 800eca6..4818887 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.Column.ComputedColumn;
 import org.apache.flink.table.catalog.Column.MetadataColumn;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -239,7 +240,7 @@ public final class DynamicSourceUtils {
         boolean changeEventsDuplicate =
                 config.getConfiguration()
                         
.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE);
-        boolean hasPrimaryKey = 
catalogTable.getSchema().getPrimaryKey().isPresent();
+        boolean hasPrimaryKey = 
catalogTable.getResolvedSchema().getPrimaryKey().isPresent();
         return isCDCSource && changeEventsDuplicate && hasPrimaryKey;
     }
 
@@ -268,9 +269,8 @@ public final class DynamicSourceUtils {
                 schema.getColumns().stream()
                         .map(
                                 c -> {
-                                    if (c instanceof Column.ComputedColumn) {
-                                        final Column.ComputedColumn 
computedColumn =
-                                                (Column.ComputedColumn) c;
+                                    if (c instanceof ComputedColumn) {
+                                        final ComputedColumn computedColumn = 
(ComputedColumn) c;
                                         return 
computedColumn.getExpression().accept(converter);
                                     } else {
                                         return relBuilder.field(c.getName());
@@ -296,13 +296,13 @@ public final class DynamicSourceUtils {
 
         final List<String> fieldNames =
                 schema.getColumns().stream()
-                        .filter(c -> !(c instanceof Column.ComputedColumn))
+                        .filter(c -> !(c instanceof ComputedColumn))
                         .map(Column::getName)
                         .collect(Collectors.toList());
 
         final List<RexNode> fieldNodes =
                 schema.getColumns().stream()
-                        .filter(c -> !(c instanceof Column.ComputedColumn))
+                        .filter(c -> !(c instanceof ComputedColumn))
                         .map(
                                 c -> {
                                     final RelDataType relDataType =
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java
index 81a8c49..c95adfd 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.types.logical.TimeType;
 import org.apache.calcite.avatica.util.ByteString;
 import org.apache.calcite.avatica.util.TimeUnit;
 import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
@@ -212,7 +213,19 @@ public class ExpressionConverter implements 
ExpressionVisitor<RexNode> {
         if (other instanceof RexNodeExpression) {
             return ((RexNodeExpression) other).getRexNode();
         } else if (other instanceof LocalReferenceExpression) {
-            LocalReferenceExpression local = (LocalReferenceExpression) other;
+            final LocalReferenceExpression local = (LocalReferenceExpression) 
other;
+            // check whether the local field reference can actually be 
resolved to an existing
+            // field otherwise preserve the locality attribute
+            RelNode inputNode;
+            try {
+                inputNode = relBuilder.peek();
+            } catch (Throwable t) {
+                inputNode = null;
+            }
+            if (inputNode != null
+                    && 
inputNode.getRowType().getFieldNames().contains(local.getName())) {
+                return relBuilder.field(local.getName());
+            }
             return new RexFieldVariable(
                     local.getName(),
                     typeFactory.createFieldTypeFromLogicalType(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index cb700b6..90a0960 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -35,8 +35,9 @@ import 
org.apache.flink.streaming.api.transformations.SinkTransformation;
 import 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
@@ -52,7 +53,6 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
-import org.apache.flink.table.planner.sinks.TableSinkUtils;
 import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer;
@@ -60,16 +60,18 @@ import 
org.apache.flink.table.runtime.operators.sink.SinkOperator;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.types.RowKind;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -114,8 +116,9 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         final DynamicTableSink tableSink = tableSinkSpec.getTableSink();
         final DynamicTableSink.SinkRuntimeProvider runtimeProvider =
                 tableSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(isBounded));
-        final TableSchema tableSchema = 
tableSinkSpec.getCatalogTable().getSchema();
-        inputTransform = applyNotNullEnforcer(tableConfig, tableSchema, 
inputTransform);
+        final ResolvedSchema schema = 
tableSinkSpec.getCatalogTable().getResolvedSchema();
+        final RowType consumedRowType = getConsumedRowType(schema);
+        inputTransform = applyNotNullEnforcer(tableConfig, consumedRowType, 
inputTransform);
 
         if (runtimeProvider instanceof DataStreamSinkProvider) {
             if (runtimeProvider instanceof ParallelismProvider) {
@@ -147,7 +150,11 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
             // apply keyBy partition transformation if needed
             inputTransform =
                     applyKeyByForDifferentParallelism(
-                            tableSchema, inputTransform, inputParallelism, 
sinkParallelism);
+                            consumedRowType,
+                            schema.getPrimaryKey().orElse(null),
+                            inputTransform,
+                            inputParallelism,
+                            sinkParallelism);
 
             final SinkFunction<RowData> sinkFunction;
             if (runtimeProvider instanceof SinkFunctionProvider) {
@@ -179,15 +186,12 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
      * Apply an operator to filter or report error to process not-null values 
for not-null fields.
      */
     private Transformation<RowData> applyNotNullEnforcer(
-            TableConfig config, TableSchema tableSchema, 
Transformation<RowData> inputTransform) {
+            TableConfig config, RowType consumedRowType, 
Transformation<RowData> inputTransform) {
         final ExecutionConfigOptions.NotNullEnforcer notNullEnforcer =
                 config.getConfiguration()
                         
.get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER);
-        final int[] notNullFieldIndices = 
TableSinkUtils.getNotNullFieldIndices(tableSchema);
-        final String[] fieldNames =
-                ((RowType) 
tableSchema.toPhysicalRowDataType().getLogicalType())
-                        .getFieldNames()
-                        .toArray(new String[0]);
+        final int[] notNullFieldIndices = 
getNotNullFieldIndices(consumedRowType);
+        final String[] fieldNames = 
consumedRowType.getFieldNames().toArray(new String[0]);
 
         if (notNullFieldIndices.length > 0) {
             final SinkNotNullEnforcer enforcer =
@@ -211,6 +215,12 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         }
     }
 
+    private int[] getNotNullFieldIndices(RowType consumedType) {
+        return IntStream.range(0, consumedType.getFieldCount())
+                .filter(pos -> !consumedType.getTypeAt(pos).isNullable())
+                .toArray();
+    }
+
     /**
      * Returns the parallelism of sink operator, it assumes the sink runtime 
provider implements
      * {@link ParallelismProvider}. It returns parallelism defined in {@link 
ParallelismProvider} if
@@ -242,11 +252,12 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
      * ordering of changelog messages.
      */
     private Transformation<RowData> applyKeyByForDifferentParallelism(
-            TableSchema tableSchema,
+            RowType sinkRowType,
+            @Nullable UniqueConstraint primaryKey,
             Transformation<RowData> inputTransform,
             int inputParallelism,
             int sinkParallelism) {
-        final int[] primaryKeys = 
TableSchemaUtils.getPrimaryKeyIndices(tableSchema);
+        final int[] primaryKeys = getPrimaryKeyIndices(sinkRowType, 
primaryKey);
         if (inputParallelism == sinkParallelism || 
changelogMode.containsOnly(RowKind.INSERT)) {
             // if the inputParallelism is equals to the parallelism or 
insert-only mode, do nothing.
             return inputTransform;
@@ -276,6 +287,13 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         }
     }
 
+    private int[] getPrimaryKeyIndices(RowType sinkRowType, @Nullable 
UniqueConstraint primaryKey) {
+        if (primaryKey == null) {
+            return new int[0];
+        }
+        return 
primaryKey.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray();
+    }
+
     private Transformation<Object> createSinkFunctionTransformation(
             SinkFunction<RowData> sinkFunction,
             StreamExecutionEnvironment env,
@@ -299,4 +317,8 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
     private InternalTypeInfo<RowData> getInputTypeInfo() {
         return InternalTypeInfo.of(getInputEdges().get(0).getOutputType());
     }
+
+    private RowType getConsumedRowType(ResolvedSchema schema) {
+        return (RowType) schema.toSinkRowDataType().getLogicalType();
+    }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
index f9d526b..cf52dd8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.utils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.planner.calcite.FlinkContext;
@@ -88,6 +89,13 @@ public final class ShortcutUtils {
 
     public static @Nullable FunctionDefinition unwrapFunctionDefinition(
             ResolvedExpression expression) {
+        // Table API expression
+        if (expression instanceof CallExpression) {
+            final CallExpression callExpression = (CallExpression) expression;
+            return callExpression.getFunctionDefinition();
+        }
+
+        // SQL expression
         if (!(expression instanceof RexNodeExpression)) {
             return null;
         }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
index 24a5b50..18ce788 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
@@ -225,6 +225,22 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem)
   }
 
   /**
+   * Creates a table row type with the given field names and field types. 
Table row type is table
+   * schema for Calcite [[RelNode]]. See [[RelNode#getRowType]].
+   *
+   * It uses [[StructKind#FULLY_QUALIFIED]] to let each field must be 
referenced explicitly.
+   *
+   * @param fieldNames field names
+   * @param fieldTypes field types, every element is Flink's [[LogicalType]]
+   * @return a table row type with the input fieldNames, input fieldTypes.
+   */
+  def buildRelNodeRowType(
+      fieldNames: util.List[String],
+      fieldTypes: util.List[LogicalType]): RelDataType = {
+    buildStructType(fieldNames, fieldTypes, StructKind.FULLY_QUALIFIED)
+  }
+
+  /**
     * Creates a table row type with the input fieldNames and input fieldTypes 
using
     * FlinkTypeFactory. Table row type is table schema for Calcite RelNode. 
See getRowType of
     * [[RelNode]]. Use FULLY_QUALIFIED to let each field must be referenced 
explicitly.
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index e676771..c16ab9e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -51,20 +51,18 @@ import 
org.apache.flink.table.planner.utils.InternalConfigOptions.{TABLE_QUERY_S
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
-import org.apache.flink.table.utils.TableSchemaUtils
 
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.{RelTrait, RelTraitDef}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.tools.FrameworkConfig
 
 import java.lang.{Long => JLong}
 import java.util
 import java.util.TimeZone
 
-import org.apache.calcite.rel.hint.RelHint
-
 import _root_.scala.collection.JavaConversions._
 
 /**
@@ -182,7 +180,7 @@ abstract class PlannerBase(
         // validate query schema and sink schema, and apply cast if possible
         val query = validateSchemaAndApplyImplicitCast(
           input,
-          sinkSchema,
+          catalogManager.getSchemaResolver.resolve(sinkSchema.toSchema),
           null,
           dataTypeFactory,
           getTypeFactory)
@@ -211,7 +209,7 @@ abstract class PlannerBase(
             // validate query schema and sink schema, and apply cast if 
possible
             val query = validateSchemaAndApplyImplicitCast(
               input,
-              TableSchemaUtils.getPhysicalSchema(table.getSchema),
+              table.getResolvedSchema,
               catalogSink.getTableIdentifier,
               dataTypeFactory,
               getTypeFactory)
@@ -256,7 +254,7 @@ abstract class PlannerBase(
         // validate query schema and sink schema, and apply cast if possible
         val query = validateSchemaAndApplyImplicitCast(
           input,
-          sinkPhysicalSchema,
+          
catalogManager.getSchemaResolver.resolve(sinkPhysicalSchema.toSchema),
           null,
           dataTypeFactory,
           getTypeFactory)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
index c0e8a76..ae3828a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
@@ -252,7 +252,7 @@ class LegacyCatalogSourceTable[T](
           f.getType
         }
       }
-      factory.buildRelNodeRowType(fieldNames, fieldTypes)
+      factory.buildRelNodeRowType(fieldNames.asScala, fieldTypes)
     }
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
index 87757e8..3ea44f4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
@@ -269,15 +269,4 @@ object TableSinkUtils {
         false)
      }
   }
-
-  /**
-   * Gets the NOT NULL physical field indices on the [[CatalogTable]].
-   */
-  def getNotNullFieldIndices(tableSchema: TableSchema): Array[Int] = {
-    val rowType = 
tableSchema.toPhysicalRowDataType.getLogicalType.asInstanceOf[RowType]
-    val fieldTypes = rowType.getFields.map(_.getType).toArray
-    fieldTypes.indices.filter { index =>
-      !fieldTypes(index).isNullable
-    }.toArray
-  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index 2f0566a..386c43e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -68,6 +68,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.sourceWatermark;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
@@ -252,6 +254,7 @@ public class DataStreamJavaITCase extends AbstractTestBase {
                         dataStream,
                         Schema.newBuilder()
                                 .columnByMetadata("rowtime", 
"TIMESTAMP_LTZ(3)")
+                                // uses SQL expressions
                                 .watermark("rowtime", "SOURCE_WATERMARK()")
                                 .build());
 
@@ -313,12 +316,14 @@ public class DataStreamJavaITCase extends 
AbstractTestBase {
                         changelogStream,
                         Schema.newBuilder()
                                 .columnByMetadata("rowtime", 
DataTypes.TIMESTAMP_LTZ(3))
-                                .watermark("rowtime", "SOURCE_WATERMARK()")
+                                // uses Table API expressions
+                                .columnByExpression("computed", 
$("f1").upperCase())
+                                .watermark("rowtime", sourceWatermark())
                                 .build());
         tableEnv.createTemporaryView("t", table);
 
         // access and reorder columns
-        final Table reordered = tableEnv.sqlQuery("SELECT f1, rowtime, f0 FROM 
t");
+        final Table reordered = tableEnv.sqlQuery("SELECT computed, rowtime, 
f0 FROM t");
 
         // write out the rowtime column with fully declared schema
         final DataStream<Row> result =
@@ -327,6 +332,8 @@ public class DataStreamJavaITCase extends AbstractTestBase {
                         Schema.newBuilder()
                                 .column("f1", DataTypes.STRING())
                                 .columnByMetadata("rowtime", 
DataTypes.TIMESTAMP_LTZ(3))
+                                // uses Table API expressions
+                                .columnByExpression("ignored", 
$("f1").upperCase())
                                 .column("f0", DataTypes.INT())
                                 .build());
 
@@ -343,9 +350,9 @@ public class DataStreamJavaITCase extends AbstractTestBase {
                                     out.collect(Row.of(key, sum));
                                 })
                         .returns(Types.ROW(Types.STRING, Types.INT)),
-                Row.of("a", 47),
-                Row.of("c", 1000),
-                Row.of("c", 1000));
+                Row.of("A", 47),
+                Row.of("C", 1000),
+                Row.of("C", 1000));
     }
 
     @Test

Reply via email to