This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a46a36b9c3 [FLINK-27263][table] Rename the metadata column to the 
user specified name in DDL
8a46a36b9c3 is described below

commit 8a46a36b9c3eaf88da7e139e52e5c9d987f98a3d
Author: Shengkai <1059623...@qq.com>
AuthorDate: Fri Apr 15 16:58:57 2022 +0800

    [FLINK-27263][table] Rename the metadata column to the user specified name 
in DDL
    
    This closes #19521.
---
 .../flink/table/catalog/DefaultSchemaResolver.java | 28 ++++++++++++
 .../flink/table/catalog/SchemaResolutionTest.java  | 20 +++++++++
 .../sink/abilities/SupportsWritingMetadata.java    | 16 ++++---
 .../source/abilities/SupportsReadingMetadata.java  | 11 +++--
 .../table/planner/connectors/DynamicSinkUtils.java | 47 ++++++++++++--------
 .../planner/connectors/DynamicSourceUtils.java     | 47 ++++++++++----------
 .../PushProjectIntoTableSourceScanRule.java        | 30 ++++++++++---
 .../PushProjectIntoTableSourceScanRuleTest.java    | 14 ++++--
 .../file/table/FileSystemTableSourceTest.xml       |  4 +-
 .../planner/plan/batch/sql/TableSourceTest.xml     |  4 +-
 .../testWritingMetadata.out                        |  2 +-
 .../testReadingMetadata.out                        | 10 ++---
 .../PushProjectIntoTableSourceScanRuleTest.xml     | 17 ++++++++
 .../PushWatermarkIntoTableSourceScanRuleTest.xml   |  4 +-
 .../PushLocalAggIntoTableSourceScanRuleTest.xml    |  2 +-
 .../plan/stream/sql/SourceWatermarkTest.xml        |  4 +-
 .../planner/plan/stream/sql/TableScanTest.xml      | 12 ++---
 .../planner/plan/stream/sql/TableSinkTest.xml      | 40 ++++++++---------
 .../planner/plan/stream/sql/TableSourceTest.xml    |  4 +-
 .../planner/plan/stream/sql/TableScanTest.scala    | 51 ++++++++++++++++++++++
 .../runtime/stream/sql/TableSourceITCase.scala     | 14 ++++++
 21 files changed, 278 insertions(+), 103 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
index 0b68aeff79b..f2f011e22c2 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
@@ -43,6 +43,7 @@ import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -95,6 +96,7 @@ class DefaultSchemaResolver implements SchemaResolver {
     private List<Column> resolveColumns(List<Schema.UnresolvedColumn> 
unresolvedColumns) {
 
         validateDuplicateColumns(unresolvedColumns);
+        validateDuplicateMetadataKeys(unresolvedColumns);
 
         final Column[] resolvedColumns = new Column[unresolvedColumns.size()];
         // process source columns first before computed columns
@@ -175,6 +177,32 @@ class DefaultSchemaResolver implements SchemaResolver {
         }
     }
 
+    private void validateDuplicateMetadataKeys(List<Schema.UnresolvedColumn> 
columns) {
+        Map<String, String> metadataKeyToColumnNames = new HashMap<>();
+        for (Schema.UnresolvedColumn column : columns) {
+            if (!(column instanceof UnresolvedMetadataColumn)) {
+                continue;
+            }
+
+            UnresolvedMetadataColumn metadataColumn = 
(UnresolvedMetadataColumn) column;
+            String metadataKey =
+                    metadataColumn.getMetadataKey() == null
+                            ? metadataColumn.getName()
+                            : metadataColumn.getMetadataKey();
+            if (metadataKeyToColumnNames.containsKey(metadataKey)) {
+                throw new ValidationException(
+                        String.format(
+                                "The column `%s` and `%s` in the table are 
both from the same metadata key '%s'. "
+                                        + "Please specify one of the columns 
as the metadata column and use the "
+                                        + "computed column syntax to specify 
the others.",
+                                metadataKeyToColumnNames.get(metadataKey),
+                                metadataColumn.getName(),
+                                metadataKey));
+            }
+            metadataKeyToColumnNames.put(metadataKey, 
metadataColumn.getName());
+        }
+    }
+
     private List<WatermarkSpec> resolveWatermarkSpecs(
             List<UnresolvedWatermarkSpec> unresolvedWatermarkSpecs, 
List<Column> inputColumns) {
         if (unresolvedWatermarkSpecs.size() == 0) {
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
index 6b95170a7a2..be6cbfb42be 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
@@ -219,6 +219,26 @@ public class SchemaResolutionTest {
                 Schema.newBuilder().columnByExpression("invalid", 
callSql("INVALID")).build(),
                 "Invalid expression for computed column 'invalid'.");
 
+        // metadata columns
+
+        testError(
+                Schema.newBuilder()
+                        .columnByMetadata("metadata", DataTypes.INT())
+                        .columnByMetadata("from_metadata", DataTypes.BIGINT(), 
"metadata", false)
+                        .build(),
+                "The column `metadata` and `from_metadata` in the table are 
both from the same metadata key 'metadata'. "
+                        + "Please specify one of the columns as the metadata 
column and use the "
+                        + "computed column syntax to specify the others.");
+
+        testError(
+                Schema.newBuilder()
+                        .columnByMetadata("from_metadata", DataTypes.BIGINT(), 
"metadata", false)
+                        .columnByMetadata("from_metadata2", 
DataTypes.STRING(), "metadata", true)
+                        .build(),
+                "The column `from_metadata` and `from_metadata2` in the table 
are both from the same metadata key 'metadata'. "
+                        + "Please specify one of the columns as the metadata 
column and use the "
+                        + "computed column syntax to specify the others.");
+
         // time attributes and watermarks
 
         testError(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java
index 858bc05ef6c..d789da1d842 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java
@@ -68,13 +68,17 @@ import java.util.Map;
  * casting from INT will be performed by the planner in a preceding operation:
  *
  * <pre>{@code
- * // for t1 and t2
- * ROW < i INT, s STRING, d DOUBLE >                                           
   // physical input
- * ROW < i INT, s STRING, d DOUBLE, $metadata$timestamp TIMESTAMP(3) WITH 
LOCAL TIME ZONE > // final input
+ * // physical input
+ * ROW < i INT, s STRING, d DOUBLE >
  *
- * // for t3
- * ROW < i INT, s STRING, d DOUBLE >                                           
   // physical input
- * ROW < i INT, s STRING, d DOUBLE >                                           
   // final input
+ * // final input (i.e. consumed type) for t1
+ * ROW < i INT, s STRING, d DOUBLE, timestamp TIMESTAMP(3) WITH LOCAL TIME 
ZONE >
+ *
+ * // final input (i.e. consumed type) for t2
+ * ROW < i INT, s STRING, d DOUBLE, myTimestamp TIMESTAMP(3) WITH LOCAL TIME 
ZONE >
+ *
+ * // final input (i.e. consumed type) for t3
+ * ROW < i INT, s STRING, d DOUBLE >
  * }</pre>
  */
 @PublicEvolving
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
index ac462b668a9..60b6932b617 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
@@ -76,9 +76,14 @@ import java.util.Map;
  * casting to INT will be performed by the planner in a subsequent operation:
  *
  * <pre>{@code
- * // for t1 and t2
- * ROW < i INT, s STRING, d DOUBLE >                                           
   // physical output
- * ROW < i INT, s STRING, d DOUBLE, $metadata$timestamp TIMESTAMP(3) WITH 
LOCAL TIME ZONE > // final output
+ * // physical output
+ * ROW < i INT, s STRING, d DOUBLE >
+ *
+ * // final output (i.e. produced type) for t1
+ * ROW < i INT, s STRING, d DOUBLE, timestamp TIMESTAMP(3) WITH LOCAL TIME 
ZONE >
+ *
+ * // final output (i.e. produced type) for t2
+ * ROW < i INT, s STRING, d DOUBLE, myTimestamp TIMESTAMP(3) WITH LOCAL TIME 
ZONE >
  * }</pre>
  */
 @PublicEvolving
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 7a906ffa9bd..b810e71c615 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -66,9 +66,9 @@ import org.apache.calcite.rex.RexNode;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -84,9 +84,6 @@ import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.suppor
 @Internal
 public final class DynamicSinkUtils {
 
-    // Ensures that physical and metadata columns don't collide.
-    private static final String METADATA_COLUMN_PREFIX = "$metadata$";
-
     /** Converts an {@link TableResult#collect()} sink to a {@link RelNode}. */
     public static RelNode convertCollectToRel(
             FlinkRelBuilder relBuilder,
@@ -330,7 +327,8 @@ public final class DynamicSinkUtils {
                                         Function.identity()));
 
         final List<Integer> metadataColumns =
-                createRequiredMetadataKeys(schema, sink).stream()
+                createRequiredMetadataColumns(schema, sink).stream()
+                        .map(col -> col.getMetadataKey().orElse(col.getName()))
                         .map(keyToMetadataColumn::get)
                         .collect(Collectors.toList());
 
@@ -405,28 +403,31 @@ public final class DynamicSinkUtils {
     }
 
     /**
-     * Returns a list of required metadata keys. Ordered by the iteration 
order of {@link
+     * Returns a list of required metadata columns. Ordered by the iteration 
order of {@link
      * SupportsWritingMetadata#listWritableMetadata()}.
      *
      * <p>This method assumes that sink and schema have been validated via 
{@link
      * #prepareDynamicSink}.
      */
-    private static List<String> createRequiredMetadataKeys(
+    private static List<MetadataColumn> createRequiredMetadataColumns(
             ResolvedSchema schema, DynamicTableSink sink) {
         final List<Column> tableColumns = schema.getColumns();
         final List<Integer> metadataColumns = 
extractPersistedMetadataColumns(schema);
 
-        final Set<String> requiredMetadataKeys =
-                metadataColumns.stream()
-                        .map(tableColumns::get)
-                        .map(MetadataColumn.class::cast)
-                        .map(c -> c.getMetadataKey().orElse(c.getName()))
-                        .collect(Collectors.toSet());
+        Map<String, MetadataColumn> metadataKeysToMetadataColumns = new 
HashMap<>();
+
+        for (Integer columnIndex : metadataColumns) {
+            MetadataColumn metadataColumn = (MetadataColumn) 
tableColumns.get(columnIndex);
+            String metadataKey = 
metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
+            // After resolving, every metadata column has the unique metadata 
key.
+            metadataKeysToMetadataColumns.put(metadataKey, metadataColumn);
+        }
 
         final Map<String, DataType> metadataMap = extractMetadataMap(sink);
 
         return metadataMap.keySet().stream()
-                .filter(requiredMetadataKeys::contains)
+                .filter(metadataKeysToMetadataColumns::containsKey)
+                .map(metadataKeysToMetadataColumns::get)
                 .collect(Collectors.toList());
     }
 
@@ -626,7 +627,9 @@ public final class DynamicSinkUtils {
 
         sinkAbilitySpecs.add(
                 new WritingMetadataSpec(
-                        createRequiredMetadataKeys(schema, sink),
+                        createRequiredMetadataColumns(schema, sink).stream()
+                                .map(col -> 
col.getMetadataKey().orElse(col.getName()))
+                                .collect(Collectors.toList()),
                         createConsumedType(schema, sink)));
     }
 
@@ -644,12 +647,18 @@ public final class DynamicSinkUtils {
                         .map(c -> new RowField(c.getName(), 
c.getDataType().getLogicalType()));
 
         final Stream<RowField> metadataFields =
-                createRequiredMetadataKeys(schema, sink).stream()
+                createRequiredMetadataColumns(schema, sink).stream()
                         .map(
-                                k ->
+                                column ->
                                         new RowField(
-                                                METADATA_COLUMN_PREFIX + k,
-                                                
metadataMap.get(k).getLogicalType()));
+                                                // Use alias to ensures that 
physical and metadata
+                                                // columns don't collide.
+                                                column.getName(),
+                                                metadataMap
+                                                        .get(
+                                                                
column.getMetadataKey()
+                                                                        
.orElse(column.getName()))
+                                                        .getLogicalType()));
 
         final List<RowField> rowFields =
                 Stream.concat(physicalFields, 
metadataFields).collect(Collectors.toList());
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index fc77b34d3be..6a3104e824e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -59,9 +59,9 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -71,9 +71,6 @@ import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.suppor
 @Internal
 public final class DynamicSourceUtils {
 
-    // Ensures that physical and metadata columns don't collide.
-    public static final String METADATA_COLUMN_PREFIX = "$metadata$";
-
     /**
      * Converts a given {@link DataStream} to a {@link RelNode}. It adds 
helper projections if
      * necessary.
@@ -168,26 +165,31 @@ public final class DynamicSourceUtils {
     // TODO: isUpsertSource(), isSourceChangeEventsDuplicate()
 
     /**
-     * Returns a list of required metadata keys. Ordered by the iteration 
order of {@link
+     * Returns a list of required metadata columns. Ordered by the iteration 
order of {@link
      * SupportsReadingMetadata#listReadableMetadata()}.
      *
      * <p>This method assumes that source and schema have been validated via 
{@link
      * #prepareDynamicSource(String, ResolvedCatalogTable, DynamicTableSource, 
boolean,
      * ReadableConfig)}.
      */
-    public static List<String> createRequiredMetadataKeys(
+    public static List<MetadataColumn> createRequiredMetadataColumns(
             ResolvedSchema schema, DynamicTableSource source) {
         final List<MetadataColumn> metadataColumns = 
extractMetadataColumns(schema);
 
-        final Set<String> requiredMetadataKeys =
-                metadataColumns.stream()
-                        .map(c -> c.getMetadataKey().orElse(c.getName()))
-                        .collect(Collectors.toSet());
+        Map<String, MetadataColumn> metadataKeysToMetadataColumns = new 
HashMap<>();
+
+        for (MetadataColumn column : metadataColumns) {
+            String metadataKey = 
column.getMetadataKey().orElse(column.getName());
+            // After resolving, every metadata column has the unique metadata 
key.
+            metadataKeysToMetadataColumns.put(metadataKey, column);
+        }
 
         final Map<String, DataType> metadataMap = extractMetadataMap(source);
 
+        // reorder the column
         return metadataMap.keySet().stream()
-                .filter(requiredMetadataKeys::contains)
+                .filter(metadataKeysToMetadataColumns::containsKey)
+                .map(metadataKeysToMetadataColumns::get)
                 .collect(Collectors.toList());
     }
 
@@ -206,12 +208,16 @@ public final class DynamicSourceUtils {
                 ((RowType) 
schema.toPhysicalRowDataType().getLogicalType()).getFields().stream();
 
         final Stream<RowField> metadataFields =
-                createRequiredMetadataKeys(schema, source).stream()
+                createRequiredMetadataColumns(schema, source).stream()
                         .map(
                                 k ->
                                         new RowField(
-                                                METADATA_COLUMN_PREFIX + k,
-                                                
metadataMap.get(k).getLogicalType()));
+                                                // Use the alias to ensure 
that physical and
+                                                // metadata columns don't 
collide
+                                                k.getName(),
+                                                metadataMap
+                                                        
.get(k.getMetadataKey().orElse(k.getName()))
+                                                        .getLogicalType()));
 
         final List<RowField> rowFields =
                 Stream.concat(physicalFields, 
metadataFields).collect(Collectors.toList());
@@ -317,14 +323,9 @@ public final class DynamicSourceUtils {
                                                             
c.getDataType().getLogicalType());
                                     if (c instanceof MetadataColumn) {
                                         final MetadataColumn metadataColumn = 
(MetadataColumn) c;
-                                        final String metadataKey =
-                                                metadataColumn
-                                                        .getMetadataKey()
-                                                        
.orElse(metadataColumn.getName());
+                                        String columnName = 
metadataColumn.getName();
                                         return rexBuilder.makeAbstractCast(
-                                                relDataType,
-                                                relBuilder.field(
-                                                        METADATA_COLUMN_PREFIX 
+ metadataKey));
+                                                relDataType, 
relBuilder.field(columnName));
                                     } else {
                                         return relBuilder.field(c.getName());
                                     }
@@ -445,7 +446,9 @@ public final class DynamicSourceUtils {
                 });
 
         metadataSource.applyReadableMetadata(
-                createRequiredMetadataKeys(schema, source),
+                createRequiredMetadataColumns(schema, source).stream()
+                        .map(column -> 
column.getMetadataKey().orElse(column.getName()))
+                        .collect(Collectors.toList()),
                 
TypeConversions.fromLogicalToDataType(createProducedType(schema, source)));
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
index 58af37cbcbf..17e87a4bd45 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
@@ -60,9 +60,8 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.METADATA_COLUMN_PREFIX;
 import static 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createProducedType;
-import static 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataKeys;
+import static 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataColumns;
 import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
 import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
 
@@ -250,12 +249,16 @@ public class PushProjectIntoTableSourceScanRule
         final List<NestedColumn> projectedMetadataColumns;
         if (supportsMetadata(source.tableSource())) {
             final List<String> declaredMetadataKeys =
-                    createRequiredMetadataKeys(
-                            source.contextResolvedTable().getResolvedSchema(),
-                            source.tableSource());
+                    createRequiredMetadataColumns(
+                                    
source.contextResolvedTable().getResolvedSchema(),
+                                    source.tableSource())
+                            .stream()
+                            .map(col -> 
col.getMetadataKey().orElse(col.getName()))
+                            .collect(Collectors.toList());
 
             numPhysicalColumns = producedType.getFieldCount() - 
declaredMetadataKeys.size();
 
+            // the projected metadata column name
             projectedMetadataColumns =
                     IntStream.range(0, declaredMetadataKeys.size())
                             .mapToObj(i -> 
producedType.getFieldNames().get(numPhysicalColumns + i))
@@ -306,10 +309,23 @@ public class PushProjectIntoTableSourceScanRule
                 (RowType) Projection.of(projectedFields).project(producedType);
 
         if (supportsMetadata(source.tableSource())) {
+            // Use the projected column name to get the metadata key
             final List<String> projectedMetadataKeys =
                     projectedMetadataColumns.stream()
-                            .map(NestedColumn::name)
-                            .map(k -> 
k.substring(METADATA_COLUMN_PREFIX.length()))
+                            .map(
+                                    nestedColumn ->
+                                            source.contextResolvedTable()
+                                                    .getResolvedSchema()
+                                                    
.getColumn(nestedColumn.name())
+                                                    .orElseThrow(
+                                                            () ->
+                                                                    new 
TableException(
+                                                                            
String.format(
+                                                                               
     "Can not find the column %s in the origin schema.",
+                                                                               
     nestedColumn
+                                                                               
             .name()))))
+                            .map(Column.MetadataColumn.class::cast)
+                            .map(col -> 
col.getMetadataKey().orElse(col.getName()))
                             .collect(Collectors.toList());
 
             abilitySpecs.add(new ReadingMetadataSpec(projectedMetadataKeys, 
newProducedType));
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
index b1bf4aa146c..2d011b26272 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
@@ -113,7 +113,8 @@ public class PushProjectIntoTableSourceScanRuleTest
                         + "  id int,\n"
                         + "  deepNested row<nested1 row<name string, `value` 
int>, nested2 row<num int, flag boolean>>,\n"
                         + "  metadata_1 int metadata,\n"
-                        + "  metadata_2 string metadata\n"
+                        + "  metadata_2 string metadata,\n"
+                        + "  metadata_3 as cast(metadata_1 as bigint)\n"
                         + ") WITH ("
                         + " 'connector' = 'values',"
                         + " 'nested-projection-supported' = 'true',"
@@ -202,6 +203,13 @@ public class PushProjectIntoTableSourceScanRuleTest
         util().verifyRelPlan(sqlQuery);
     }
 
+    @Test
+    public void testProjectWithDuplicateMetadataKey() {
+        String sqlQuery = "SELECT id, metadata_3, metadata_1 FROM 
MetadataTable";
+
+        util().verifyRelPlan(sqlQuery);
+    }
+
     @Test
     public void testNestProjectWithMetadata() {
         String sqlQuery =
@@ -341,7 +349,7 @@ public class PushProjectIntoTableSourceScanRuleTest
 
         
assertThat(DataType.getFieldNames(appliedProjectionDataType.get())).isEmpty();
         assertThat(DataType.getFieldNames(appliedMetadataDataType.get()))
-                .containsExactly("$metadata$m2");
+                .containsExactly("metadata");
     }
 
     @Test
@@ -364,7 +372,7 @@ public class PushProjectIntoTableSourceScanRuleTest
 
         
assertThat(DataType.getFieldNames(appliedProjectionDataType.get())).containsExactly("f1");
         assertThat(DataType.getFieldNames(appliedMetadataDataType.get()))
-                .isEqualTo(Arrays.asList("f1", "$metadata$m2"));
+                .isEqualTo(Arrays.asList("f1", "metadata"));
     }
 
     // 
---------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
index 479eb8ad787..41096c54733 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml
@@ -50,8 +50,8 @@ LogicalSink(table=[default_catalog.default_database.MySink], 
fields=[a, b, filem
     <Resource name="optimized rel plan">
       <![CDATA[
 Sink(table=[default_catalog.default_database.MySink], fields=[a, b, filemeta])
-+- Calc(select=[a, b, CAST($metadata$file.path AS VARCHAR(2147483647)) AS 
filemeta])
-   +- TableSourceScan(table=[[default_catalog, default_database, 
MyTableWithMeta, project=[a, b], metadata=[file.path]]], fields=[a, b, 
$metadata$file.path])
++- Calc(select=[a, b, CAST(filemeta AS VARCHAR(2147483647)) AS filemeta])
+   +- TableSourceScan(table=[[default_catalog, default_database, 
MyTableWithMeta, project=[a, b], metadata=[file.path]]], fields=[a, b, 
filemeta])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
index 037458a3857..4c134457412 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
@@ -113,8 +113,8 @@ LogicalProject(id=[$0], nested1=[$1.nested1], 
results=[+(+($1.nested1.value, $1.
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + 
deepNested_nested2_num) + $metadata$metadata_1) AS results])
-+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, 
deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], 
fields=[id, deepNested_nested1, deepNested_nested2_num, $metadata$metadata_1])
+Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + 
deepNested_nested2_num) + metadata_1) AS results])
++- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, 
deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], 
fields=[id, deepNested_nested1, deepNested_nested2_num, metadata_1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
index 53107af6c88..60411d8a3d9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out
@@ -69,7 +69,7 @@
       "abilities" : [ {
         "type" : "WritingMetadata",
         "metadataKeys" : [ "m" ],
-        "consumedType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` 
VARCHAR(2147483647)> NOT NULL"
+        "consumedType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)> 
NOT NULL"
       } ]
     },
     "inputChangelogMode" : [ "INSERT" ],
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out
index 81bc950fda9..a8014cf4400 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out
@@ -40,11 +40,11 @@
       }, {
         "type" : "ReadingMetadata",
         "metadataKeys" : [ "m" ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` 
VARCHAR(2147483647)> NOT NULL"
+        "producedType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)> 
NOT NULL"
       } ]
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` 
VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, project=[a, b], metadata=[m]]], fields=[a, b, 
$metadata$m])",
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, project=[a, b], metadata=[m]]], fields=[a, b, m])",
     "inputProperties" : [ ]
   }, {
     "id" : 2,
@@ -88,8 +88,8 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` 
VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[a, b, $metadata$m])"
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[a, b, m])"
   } ],
   "edges" : [ {
     "source" : 1,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
index 2115b4b73f2..9c99e35123b 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
@@ -312,6 +312,23 @@ LogicalProject(id=[$0], EXPR$1=[ITEM($5, _UTF-16LE'e')])
       <![CDATA[
 LogicalProject(id=[$0], EXPR$1=[ITEM($1, _UTF-16LE'e')])
 +- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, 
project=[id, testMap], metadata=[]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testProjectWithDuplicateMetadataKey">
+    <Resource name="sql">
+      <![CDATA[SELECT id, metadata_3, metadata_1 FROM MetadataTable]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], metadata_3=[CAST($2):BIGINT], metadata_1=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(id=[$0], metadata_3=[CAST($1):BIGINT], metadata_1=[$1])
++- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, 
project=[id], metadata=[metadata_1]]])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
index b0874d69e86..ae25b30e846 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
@@ -111,8 +111,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], metadata=[$3], 
computed=[$4])
     <Resource name="optimized rel plan">
       <![CDATA[
 FlinkLogicalCalc(select=[a, b, c, metadata, computed])
-+- FlinkLogicalCalc(select=[a, b, Reinterpret(c) AS c, 
CAST($metadata$metadata_2 AS BIGINT) AS metadata, +(CAST($metadata$metadata_2 
AS BIGINT), b) AS computed])
-   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, watermark=[-(c, CAST(+(CAST($metadata$metadata_2 AS BIGINT), 
+(CAST($metadata$metadata_2 AS BIGINT), b)) AS INTERVAL SECOND))]]], fields=[a, 
b, c, $metadata$metadata_2])
++- FlinkLogicalCalc(select=[a, b, Reinterpret(c) AS c, CAST(metadata AS 
BIGINT) AS metadata, +(CAST(metadata AS BIGINT), b) AS computed])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, watermark=[-(c, CAST(+(CAST(metadata AS BIGINT), +(CAST(metadata AS 
BIGINT), b)) AS INTERVAL SECOND))]]], fields=[a, b, c, metadata])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
index abaf03610cc..b8de2fc902e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
@@ -332,7 +332,7 @@ LogicalProject(EXPR$0=[$2], EXPR$1=[$3], name=[$0], 
type=[$1])
 Calc(select=[EXPR$0, EXPR$1, name, type])
 +- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, 
Final_SUM(sum$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1])
    +- Exchange(distribution=[hash[name, type]])
-      +- TableSourceScan(table=[[default_catalog, default_database, 
inventory_meta, filter=[=(id, 123:BIGINT)], project=[name, type, amount], 
metadata=[metadata_1], aggregates=[grouping=[name,type], 
aggFunctions=[LongSumAggFunction(amount),LongMaxAggFunction($metadata$metadata_1)]]]],
 fields=[name, type, sum$0, max$1])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
inventory_meta, filter=[=(id, 123:BIGINT)], project=[name, type, amount], 
metadata=[metadata_1], aggregates=[grouping=[name,type], 
aggFunctions=[LongSumAggFunction(amount),LongMaxAggFunction(metadata_1)]]]], 
fields=[name, type, sum$0, max$1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
index 88d4eba4d39..b6fb8db11bd 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
@@ -144,7 +144,7 @@ LogicalProject(a=[$0], b=[$1])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b])
-+- TableSourceScan(table=[[default_catalog, default_database, MyLtzTable, 
project=[a, b], metadata=[originTime], 
watermark=[TO_TIMESTAMP_LTZ($metadata$originTime, 3)]]], fields=[a, b, 
$metadata$originTime])
++- TableSourceScan(table=[[default_catalog, default_database, MyLtzTable, 
project=[a, b], metadata=[originTime], watermark=[TO_TIMESTAMP_LTZ(originTime, 
3)]]], fields=[a, b, originTime])
 ]]>
     </Resource>
   </TestCase>
@@ -182,7 +182,7 @@ LogicalProject(a=[$0], b=[$1])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a, b], metadata=[originTime], 
watermark=[TO_TIMESTAMP(FROM_UNIXTIME(/($metadata$originTime, 1000)), 
_UTF-16LE'yyyy-MM-dd HH:mm:ss')]]], fields=[a, b, $metadata$originTime])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a, b], metadata=[originTime], 
watermark=[TO_TIMESTAMP(FROM_UNIXTIME(/(originTime, 1000)), 
_UTF-16LE'yyyy-MM-dd HH:mm:ss')]]], fields=[a, b, originTime])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 7c252f7ae31..62c3334a959 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -192,8 +192,8 @@ LogicalProject(a=[$0], other_metadata=[CAST($4):INTEGER], 
b=[$1], c=[$2], metada
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, CAST($metadata$metadata_3 AS INTEGER) AS other_metadata, b, c, 
$metadata$metadata_1 AS metadata_1, UPPER($metadata$metadata_1) AS computed])
-+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], 
fields=[a, b, c, $metadata$metadata_1, $metadata$metadata_3])
+Calc(select=[a, CAST(other_metadata AS INTEGER) AS other_metadata, b, c, 
metadata_1, UPPER(metadata_1) AS computed])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], 
fields=[a, b, c, metadata_1, other_metadata])
 ]]>
     </Resource>
   </TestCase>
@@ -209,8 +209,8 @@ LogicalProject(b=[$1], other_metadata=[CAST($4):INTEGER])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[b, CAST($metadata$metadata_3 AS INTEGER) AS other_metadata])
-+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, 
project=[b], metadata=[metadata_3]]], fields=[b, $metadata$metadata_3])
+Calc(select=[b, CAST(other_metadata AS INTEGER) AS other_metadata])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, 
project=[b], metadata=[metadata_3]]], fields=[b, other_metadata])
 ]]>
     </Resource>
   </TestCase>
@@ -226,8 +226,8 @@ LogicalProject(timestamp=[$0], metadata_timestamp=[$2], 
other=[$1], computed_oth
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[timestamp, $metadata$timestamp AS metadata_timestamp, 
$metadata$other AS other, UPPER($metadata$other) AS computed_other, 
CAST($metadata$timestamp AS VARCHAR(2147483647)) AS computed_timestamp])
-+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], 
fields=[timestamp, $metadata$other, $metadata$timestamp])
+Calc(select=[timestamp, metadata_timestamp, other, UPPER(other) AS 
computed_other, CAST(metadata_timestamp AS VARCHAR(2147483647)) AS 
computed_timestamp])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], 
fields=[timestamp, other, metadata_timestamp])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index 41f48c1cc4e..2b8b78770c8 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -255,6 +255,22 @@ Sink(table=[default_catalog.default_database.sink], 
fields=[id, city_name])
 }]]>
     </Resource>
   </TestCase>
+  <TestCase name="testInsertPartColumn">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.zm_test], fields=[a, m1, 
m2, m3, m4])
++- LogicalProject(a=[CAST($0):BIGINT], m1=[null:(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE", BIGINT) MAP], m2=[null:(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE", BIGINT) MAP], m3=[null:(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE", BIGINT) MAP], m4=[null:(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE", BIGINT) MAP])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.zm_test], fields=[a, m1, m2, m3, 
m4], changelogMode=[NONE])
++- Calc(select=[CAST(a AS BIGINT) AS a, null:(VARCHAR(2147483647), BIGINT) MAP 
AS m1, null:(VARCHAR(2147483647), BIGINT) MAP AS m2, null:(VARCHAR(2147483647), 
BIGINT) MAP AS m3, null:(VARCHAR(2147483647), BIGINT) MAP AS m4], 
changelogMode=[I])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], changelogMode=[I])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testAppendUpsertAndRetractSink">
     <Resource name="ast">
       <![CDATA[
@@ -489,8 +505,8 @@ 
LogicalSink(table=[default_catalog.default_database.MetadataTable], fields=[a, b
     <Resource name="optimized rel plan">
       <![CDATA[
 Sink(table=[default_catalog.default_database.MetadataTable], fields=[a, b, c, 
metadata_1, metadata_2])
-+- Calc(select=[a, b, c, $metadata$metadata_1 AS metadata_1, 
CAST(CAST($metadata$metadata_2 AS INTEGER) AS BIGINT) AS metadata_2])
-   +- TableSourceScan(table=[[default_catalog, default_database, 
MetadataTable, project=[a, b, c], metadata=[metadata_1, metadata_2]]], 
fields=[a, b, c, $metadata$metadata_1, $metadata$metadata_2])
++- Calc(select=[a, b, c, metadata_1, CAST(CAST(m_2 AS INTEGER) AS BIGINT) AS 
metadata_2])
+   +- TableSourceScan(table=[[default_catalog, default_database, 
MetadataTable, project=[a, b, c], metadata=[metadata_1, metadata_2]]], 
fields=[a, b, c, metadata_1, m_2])
 ]]>
     </Resource>
   </TestCase>
@@ -504,8 +520,8 @@ 
LogicalSink(table=[default_catalog.default_database.MetadataTable], fields=[meta
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.MetadataTable], 
fields=[metadata_1, metadata_2, other, $metadata$metadata_2])
-+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, 
project=[metadata_1, metadata_2, other], metadata=[metadata_2]]], 
fields=[metadata_1, metadata_2, other, $metadata$metadata_2])
+Sink(table=[default_catalog.default_database.MetadataTable], 
fields=[metadata_1, metadata_2, other, m_2])
++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, 
project=[metadata_1, metadata_2, other], metadata=[metadata_2]]], 
fields=[metadata_1, metadata_2, other, m_2])
 ]]>
     </Resource>
   </TestCase>
@@ -763,20 +779,4 @@ 
Sink(table=[default_catalog.default_database.SinkJoinChangeLog], fields=[person,
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testInsertPartColumn">
-       <Resource name="ast">
-         <![CDATA[
-LogicalSink(table=[default_catalog.default_database.zm_test], fields=[a, m1, 
m2, m3, m4])
-+- LogicalProject(a=[CAST($0):BIGINT], m1=[null:(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE", BIGINT) MAP], m2=[null:(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE", BIGINT) MAP], m3=[null:(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE", BIGINT) MAP], m4=[null:(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE", BIGINT) MAP])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
-       </Resource>
-       <Resource name="optimized rel plan">
-         <![CDATA[
-Sink(table=[default_catalog.default_database.zm_test], fields=[a, m1, m2, m3, 
m4], changelogMode=[NONE])
-+- Calc(select=[CAST(a AS BIGINT) AS a, null:(VARCHAR(2147483647), BIGINT) MAP 
AS m1, null:(VARCHAR(2147483647), BIGINT) MAP AS m2, null:(VARCHAR(2147483647), 
BIGINT) MAP AS m3, null:(VARCHAR(2147483647), BIGINT) MAP AS m4], 
changelogMode=[I])
-   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], changelogMode=[I])
-]]>
-       </Resource>
-  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index 648a40ad94e..f8009ff2197 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -79,8 +79,8 @@ LogicalProject(id=[$0], nested1=[$1.nested1], 
results=[+(+($1.nested1.value, $1.
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + 
deepNested_nested2_num) + $metadata$metadata_1) AS results])
-+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, 
deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], 
fields=[id, deepNested_nested1, deepNested_nested2_num, $metadata$metadata_1])
+Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + 
deepNested_nested2_num) + metadata_1) AS results])
++- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, 
deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], 
fields=[id, deepNested_nested1, deepNested_nested2_num, metadata_1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index 6cf12b33968..8786641ec6f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -18,12 +18,15 @@
 package org.apache.flink.table.planner.plan.stream.sql
 
 import org.apache.flink.api.scala._
+import org.apache.flink.core.testutils.FlinkAssertions
+import org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.planner.expressions.utils.Func0
 import 
org.apache.flink.table.planner.factories.TestValuesTableFactory.MockedLookupTableSource
 import org.apache.flink.table.planner.utils.TableTestBase
 
+import org.assertj.core.api.Assertions.assertThatThrownBy
 import org.junit.Test
 
 class TableScanTest extends TableTestBase {
@@ -172,6 +175,54 @@ class TableScanTest extends TableTestBase {
     util.verifyExecPlan("SELECT * FROM src WHERE a > 1")
   }
 
+  @Test
+  def testDDLWithMultipleColumnsFromSameMetadataKey(): Unit = {
+    assertThatThrownBy(() => util.tableEnv.executeSql("""
+                                                        |CREATE TABLE source (
+                                                        |  a INT METADATA,
+                                                        |  b INT METADATA FROM 
'a'
+                                                        |) WITH (
+                                                        |  'connector' = 
'COLLECTION'
+                                                        |)
+                                                        
|""".stripMargin)).satisfies(
+      FlinkAssertions.anyCauseMatches(
+        classOf[ValidationException],
+        "The column `a` and `b` in the table are both from the same metadata 
key 'a'. " +
+          "Please specify one of the columns as the metadata column and use 
the computed column" +
+          " syntax to specify the others."
+      ))
+  }
+
+  @Test
+  def testDDLWithMultipleColumnsFromSameMetadataKey2(): Unit = {
+    util.tableEnv.executeSql("""
+                               |CREATE TABLE source (
+                               |  a INT METADATA
+                               |) WITH (
+                               |  'connector' = 'COLLECTION'
+                               |)
+                               |""".stripMargin)
+    assertThatThrownBy(
+      () =>
+        util.tableEnv.executeSql(
+          """
+            |CREATE TABLE like_source (
+            |  b INT METADATA FROM 'a'
+            |)
+            |WITH (
+            |  'connector' = 'COLLECTION'
+            |) LIKE source (
+            |  INCLUDING METADATA
+            |)
+            |""".stripMargin
+        )).satisfies(
+      anyCauseMatches(
+        "The column `a` and `b` in the table are both from the same metadata 
key 'a'. " +
+          "Please specify one of the columns as the metadata column and use 
the computed column" +
+          " syntax to specify the others."
+      ))
+  }
+
   @Test
   def testKeywordsWithWatermarkComputedColumn(): Unit = {
     // Create table with field as atom expression.
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
index 86d3594a002..e19a145125a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala
@@ -74,6 +74,7 @@ class TableSourceITCase extends StreamingTestBase {
          |CREATE TABLE MetadataTable (
          |  `a` INT,
          |  `other_metadata` INT METADATA FROM 'metadata_3',
+         |  `other_metadata2` AS CAST(`other_metadata` AS BIGINT),
          |  `b` BIGINT,
          |  `metadata_1` INT METADATA,
          |  `computed` AS `metadata_1` * 2,
@@ -302,6 +303,19 @@ class TableSourceITCase extends StreamingTestBase {
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
+  @Test
+  def testDuplicateMetadataFromSameKey(): Unit = {
+    val result = tEnv
+      .sqlQuery("SELECT other_metadata, other_metadata2, metadata_2 FROM 
MetadataTable")
+      .toAppendStream[Row]
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = Seq("1,1,Hallo", "1,1,Hallo Welt wie", "2,2,Hallo Welt")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
   @Test
   def testNestedProjectionWithMetadataAccess(): Unit = {
     val query =

Reply via email to