This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 8a46a36b9c3 [FLINK-27263][table] Rename the metadata column to the user specified name in DDL 8a46a36b9c3 is described below commit 8a46a36b9c3eaf88da7e139e52e5c9d987f98a3d Author: Shengkai <1059623...@qq.com> AuthorDate: Fri Apr 15 16:58:57 2022 +0800 [FLINK-27263][table] Rename the metadata column to the user specified name in DDL This closes #19521. --- .../flink/table/catalog/DefaultSchemaResolver.java | 28 ++++++++++++ .../flink/table/catalog/SchemaResolutionTest.java | 20 +++++++++ .../sink/abilities/SupportsWritingMetadata.java | 16 ++++--- .../source/abilities/SupportsReadingMetadata.java | 11 +++-- .../table/planner/connectors/DynamicSinkUtils.java | 47 ++++++++++++-------- .../planner/connectors/DynamicSourceUtils.java | 47 ++++++++++---------- .../PushProjectIntoTableSourceScanRule.java | 30 ++++++++++--- .../PushProjectIntoTableSourceScanRuleTest.java | 14 ++++-- .../file/table/FileSystemTableSourceTest.xml | 4 +- .../planner/plan/batch/sql/TableSourceTest.xml | 4 +- .../testWritingMetadata.out | 2 +- .../testReadingMetadata.out | 10 ++--- .../PushProjectIntoTableSourceScanRuleTest.xml | 17 ++++++++ .../PushWatermarkIntoTableSourceScanRuleTest.xml | 4 +- .../PushLocalAggIntoTableSourceScanRuleTest.xml | 2 +- .../plan/stream/sql/SourceWatermarkTest.xml | 4 +- .../planner/plan/stream/sql/TableScanTest.xml | 12 ++--- .../planner/plan/stream/sql/TableSinkTest.xml | 40 ++++++++--------- .../planner/plan/stream/sql/TableSourceTest.xml | 4 +- .../planner/plan/stream/sql/TableScanTest.scala | 51 ++++++++++++++++++++++ .../runtime/stream/sql/TableSourceITCase.scala | 14 ++++++ 21 files changed, 278 insertions(+), 103 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java index 0b68aeff79b..f2f011e22c2 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java @@ -43,6 +43,7 @@ import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -95,6 +96,7 @@ class DefaultSchemaResolver implements SchemaResolver { private List<Column> resolveColumns(List<Schema.UnresolvedColumn> unresolvedColumns) { validateDuplicateColumns(unresolvedColumns); + validateDuplicateMetadataKeys(unresolvedColumns); final Column[] resolvedColumns = new Column[unresolvedColumns.size()]; // process source columns first before computed columns @@ -175,6 +177,32 @@ class DefaultSchemaResolver implements SchemaResolver { } } + private void validateDuplicateMetadataKeys(List<Schema.UnresolvedColumn> columns) { + Map<String, String> metadataKeyToColumnNames = new HashMap<>(); + for (Schema.UnresolvedColumn column : columns) { + if (!(column instanceof UnresolvedMetadataColumn)) { + continue; + } + + UnresolvedMetadataColumn metadataColumn = (UnresolvedMetadataColumn) column; + String metadataKey = + metadataColumn.getMetadataKey() == null + ? metadataColumn.getName() + : metadataColumn.getMetadataKey(); + if (metadataKeyToColumnNames.containsKey(metadataKey)) { + throw new ValidationException( + String.format( + "The column `%s` and `%s` in the table are both from the same metadata key '%s'. " + + "Please specify one of the columns as the metadata column and use the " + + "computed column syntax to specify the others.", + metadataKeyToColumnNames.get(metadataKey), + metadataColumn.getName(), + metadataKey)); + } + metadataKeyToColumnNames.put(metadataKey, metadataColumn.getName()); + } + } + private List<WatermarkSpec> resolveWatermarkSpecs( List<UnresolvedWatermarkSpec> unresolvedWatermarkSpecs, List<Column> inputColumns) { if (unresolvedWatermarkSpecs.size() == 0) { diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java index 6b95170a7a2..be6cbfb42be 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java @@ -219,6 +219,26 @@ public class SchemaResolutionTest { Schema.newBuilder().columnByExpression("invalid", callSql("INVALID")).build(), "Invalid expression for computed column 'invalid'."); + // metadata columns + + testError( + Schema.newBuilder() + .columnByMetadata("metadata", DataTypes.INT()) + .columnByMetadata("from_metadata", DataTypes.BIGINT(), "metadata", false) + .build(), + "The column `metadata` and `from_metadata` in the table are both from the same metadata key 'metadata'. " + + "Please specify one of the columns as the metadata column and use the " + + "computed column syntax to specify the others."); + + testError( + Schema.newBuilder() + .columnByMetadata("from_metadata", DataTypes.BIGINT(), "metadata", false) + .columnByMetadata("from_metadata2", DataTypes.STRING(), "metadata", true) + .build(), + "The column `from_metadata` and `from_metadata2` in the table are both from the same metadata key 'metadata'. " + + "Please specify one of the columns as the metadata column and use the " + + "computed column syntax to specify the others."); + // time attributes and watermarks testError( diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java index 858bc05ef6c..d789da1d842 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java @@ -68,13 +68,17 @@ import java.util.Map; * casting from INT will be performed by the planner in a preceding operation: * * <pre>{@code - * // for t1 and t2 - * ROW < i INT, s STRING, d DOUBLE > // physical input - * ROW < i INT, s STRING, d DOUBLE, $metadata$timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE > // final input + * // physical input + * ROW < i INT, s STRING, d DOUBLE > * - * // for t3 - * ROW < i INT, s STRING, d DOUBLE > // physical input - * ROW < i INT, s STRING, d DOUBLE > // final input + * // final input (i.e. consumed type) for t1 + * ROW < i INT, s STRING, d DOUBLE, timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE > + * + * // final input (i.e. consumed type) for t2 + * ROW < i INT, s STRING, d DOUBLE, myTimestamp TIMESTAMP(3) WITH LOCAL TIME ZONE > + * + * // final input (i.e. consumed type) for t3 + * ROW < i INT, s STRING, d DOUBLE > * }</pre> */ @PublicEvolving diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java index ac462b668a9..60b6932b617 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java @@ -76,9 +76,14 @@ import java.util.Map; * casting to INT will be performed by the planner in a subsequent operation: * * <pre>{@code - * // for t1 and t2 - * ROW < i INT, s STRING, d DOUBLE > // physical output - * ROW < i INT, s STRING, d DOUBLE, $metadata$timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE > // final output + * // physical output + * ROW < i INT, s STRING, d DOUBLE > + * + * // final output (i.e. produced type) for t1 + * ROW < i INT, s STRING, d DOUBLE, timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE > + * + * // final output (i.e. produced type) for t2 + * ROW < i INT, s STRING, d DOUBLE, myTimestamp TIMESTAMP(3) WITH LOCAL TIME ZONE > * }</pre> */ @PublicEvolving diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index 7a906ffa9bd..b810e71c615 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -66,9 +66,9 @@ import org.apache.calcite.rex.RexNode; import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -84,9 +84,6 @@ import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.suppor @Internal public final class DynamicSinkUtils { - // Ensures that physical and metadata columns don't collide. - private static final String METADATA_COLUMN_PREFIX = "$metadata$"; - /** Converts an {@link TableResult#collect()} sink to a {@link RelNode}. */ public static RelNode convertCollectToRel( FlinkRelBuilder relBuilder, @@ -330,7 +327,8 @@ public final class DynamicSinkUtils { Function.identity())); final List<Integer> metadataColumns = - createRequiredMetadataKeys(schema, sink).stream() + createRequiredMetadataColumns(schema, sink).stream() + .map(col -> col.getMetadataKey().orElse(col.getName())) .map(keyToMetadataColumn::get) .collect(Collectors.toList()); @@ -405,28 +403,31 @@ public final class DynamicSinkUtils { } /** - * Returns a list of required metadata keys. Ordered by the iteration order of {@link + * Returns a list of required metadata columns. Ordered by the iteration order of {@link * SupportsWritingMetadata#listWritableMetadata()}. * * <p>This method assumes that sink and schema have been validated via {@link * #prepareDynamicSink}. */ - private static List<String> createRequiredMetadataKeys( + private static List<MetadataColumn> createRequiredMetadataColumns( ResolvedSchema schema, DynamicTableSink sink) { final List<Column> tableColumns = schema.getColumns(); final List<Integer> metadataColumns = extractPersistedMetadataColumns(schema); - final Set<String> requiredMetadataKeys = - metadataColumns.stream() - .map(tableColumns::get) - .map(MetadataColumn.class::cast) - .map(c -> c.getMetadataKey().orElse(c.getName())) - .collect(Collectors.toSet()); + Map<String, MetadataColumn> metadataKeysToMetadataColumns = new HashMap<>(); + + for (Integer columnIndex : metadataColumns) { + MetadataColumn metadataColumn = (MetadataColumn) tableColumns.get(columnIndex); + String metadataKey = metadataColumn.getMetadataKey().orElse(metadataColumn.getName()); + // After resolving, every metadata column has the unique metadata key. + metadataKeysToMetadataColumns.put(metadataKey, metadataColumn); + } final Map<String, DataType> metadataMap = extractMetadataMap(sink); return metadataMap.keySet().stream() - .filter(requiredMetadataKeys::contains) + .filter(metadataKeysToMetadataColumns::containsKey) + .map(metadataKeysToMetadataColumns::get) .collect(Collectors.toList()); } @@ -626,7 +627,9 @@ public final class DynamicSinkUtils { sinkAbilitySpecs.add( new WritingMetadataSpec( - createRequiredMetadataKeys(schema, sink), + createRequiredMetadataColumns(schema, sink).stream() + .map(col -> col.getMetadataKey().orElse(col.getName())) + .collect(Collectors.toList()), createConsumedType(schema, sink))); } @@ -644,12 +647,18 @@ public final class DynamicSinkUtils { .map(c -> new RowField(c.getName(), c.getDataType().getLogicalType())); final Stream<RowField> metadataFields = - createRequiredMetadataKeys(schema, sink).stream() + createRequiredMetadataColumns(schema, sink).stream() .map( - k -> + column -> new RowField( - METADATA_COLUMN_PREFIX + k, - metadataMap.get(k).getLogicalType())); + // Use alias to ensures that physical and metadata + // columns don't collide. + column.getName(), + metadataMap + .get( + column.getMetadataKey() + .orElse(column.getName())) + .getLogicalType())); final List<RowField> rowFields = Stream.concat(physicalFields, metadataFields).collect(Collectors.toList()); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java index fc77b34d3be..6a3104e824e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java @@ -59,9 +59,9 @@ import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -71,9 +71,6 @@ import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.suppor @Internal public final class DynamicSourceUtils { - // Ensures that physical and metadata columns don't collide. - public static final String METADATA_COLUMN_PREFIX = "$metadata$"; - /** * Converts a given {@link DataStream} to a {@link RelNode}. It adds helper projections if * necessary. @@ -168,26 +165,31 @@ public final class DynamicSourceUtils { // TODO: isUpsertSource(), isSourceChangeEventsDuplicate() /** - * Returns a list of required metadata keys. Ordered by the iteration order of {@link + * Returns a list of required metadata columns. Ordered by the iteration order of {@link * SupportsReadingMetadata#listReadableMetadata()}. * * <p>This method assumes that source and schema have been validated via {@link * #prepareDynamicSource(String, ResolvedCatalogTable, DynamicTableSource, boolean, * ReadableConfig)}. */ - public static List<String> createRequiredMetadataKeys( + public static List<MetadataColumn> createRequiredMetadataColumns( ResolvedSchema schema, DynamicTableSource source) { final List<MetadataColumn> metadataColumns = extractMetadataColumns(schema); - final Set<String> requiredMetadataKeys = - metadataColumns.stream() - .map(c -> c.getMetadataKey().orElse(c.getName())) - .collect(Collectors.toSet()); + Map<String, MetadataColumn> metadataKeysToMetadataColumns = new HashMap<>(); + + for (MetadataColumn column : metadataColumns) { + String metadataKey = column.getMetadataKey().orElse(column.getName()); + // After resolving, every metadata column has the unique metadata key. + metadataKeysToMetadataColumns.put(metadataKey, column); + } final Map<String, DataType> metadataMap = extractMetadataMap(source); + // reorder the column return metadataMap.keySet().stream() - .filter(requiredMetadataKeys::contains) + .filter(metadataKeysToMetadataColumns::containsKey) + .map(metadataKeysToMetadataColumns::get) .collect(Collectors.toList()); } @@ -206,12 +208,16 @@ public final class DynamicSourceUtils { ((RowType) schema.toPhysicalRowDataType().getLogicalType()).getFields().stream(); final Stream<RowField> metadataFields = - createRequiredMetadataKeys(schema, source).stream() + createRequiredMetadataColumns(schema, source).stream() .map( k -> new RowField( - METADATA_COLUMN_PREFIX + k, - metadataMap.get(k).getLogicalType())); + // Use the alias to ensure that physical and + // metadata columns don't collide + k.getName(), + metadataMap + .get(k.getMetadataKey().orElse(k.getName())) + .getLogicalType())); final List<RowField> rowFields = Stream.concat(physicalFields, metadataFields).collect(Collectors.toList()); @@ -317,14 +323,9 @@ public final class DynamicSourceUtils { c.getDataType().getLogicalType()); if (c instanceof MetadataColumn) { final MetadataColumn metadataColumn = (MetadataColumn) c; - final String metadataKey = - metadataColumn - .getMetadataKey() - .orElse(metadataColumn.getName()); + String columnName = metadataColumn.getName(); return rexBuilder.makeAbstractCast( - relDataType, - relBuilder.field( - METADATA_COLUMN_PREFIX + metadataKey)); + relDataType, relBuilder.field(columnName)); } else { return relBuilder.field(c.getName()); } @@ -445,7 +446,9 @@ public final class DynamicSourceUtils { }); metadataSource.applyReadableMetadata( - createRequiredMetadataKeys(schema, source), + createRequiredMetadataColumns(schema, source).stream() + .map(column -> column.getMetadataKey().orElse(column.getName())) + .collect(Collectors.toList()), TypeConversions.fromLogicalToDataType(createProducedType(schema, source))); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java index 58af37cbcbf..17e87a4bd45 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java @@ -60,9 +60,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import static org.apache.flink.table.planner.connectors.DynamicSourceUtils.METADATA_COLUMN_PREFIX; import static org.apache.flink.table.planner.connectors.DynamicSourceUtils.createProducedType; -import static org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataKeys; +import static org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataColumns; import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext; import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory; @@ -250,12 +249,16 @@ public class PushProjectIntoTableSourceScanRule final List<NestedColumn> projectedMetadataColumns; if (supportsMetadata(source.tableSource())) { final List<String> declaredMetadataKeys = - createRequiredMetadataKeys( - source.contextResolvedTable().getResolvedSchema(), - source.tableSource()); + createRequiredMetadataColumns( + source.contextResolvedTable().getResolvedSchema(), + source.tableSource()) + .stream() + .map(col -> col.getMetadataKey().orElse(col.getName())) + .collect(Collectors.toList()); numPhysicalColumns = producedType.getFieldCount() - declaredMetadataKeys.size(); + // the projected metadata column name projectedMetadataColumns = IntStream.range(0, declaredMetadataKeys.size()) .mapToObj(i -> producedType.getFieldNames().get(numPhysicalColumns + i)) @@ -306,10 +309,23 @@ public class PushProjectIntoTableSourceScanRule (RowType) Projection.of(projectedFields).project(producedType); if (supportsMetadata(source.tableSource())) { + // Use the projected column name to get the metadata key final List<String> projectedMetadataKeys = projectedMetadataColumns.stream() - .map(NestedColumn::name) - .map(k -> k.substring(METADATA_COLUMN_PREFIX.length())) + .map( + nestedColumn -> + source.contextResolvedTable() + .getResolvedSchema() + .getColumn(nestedColumn.name()) + .orElseThrow( + () -> + new TableException( + String.format( + "Can not find the column %s in the origin schema.", + nestedColumn + .name())))) + .map(Column.MetadataColumn.class::cast) + .map(col -> col.getMetadataKey().orElse(col.getName())) .collect(Collectors.toList()); abilitySpecs.add(new ReadingMetadataSpec(projectedMetadataKeys, newProducedType)); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java index b1bf4aa146c..2d011b26272 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java @@ -113,7 +113,8 @@ public class PushProjectIntoTableSourceScanRuleTest + " id int,\n" + " deepNested row<nested1 row<name string, `value` int>, nested2 row<num int, flag boolean>>,\n" + " metadata_1 int metadata,\n" - + " metadata_2 string metadata\n" + + " metadata_2 string metadata,\n" + + " metadata_3 as cast(metadata_1 as bigint)\n" + ") WITH (" + " 'connector' = 'values'," + " 'nested-projection-supported' = 'true'," @@ -202,6 +203,13 @@ public class PushProjectIntoTableSourceScanRuleTest util().verifyRelPlan(sqlQuery); } + @Test + public void testProjectWithDuplicateMetadataKey() { + String sqlQuery = "SELECT id, metadata_3, metadata_1 FROM MetadataTable"; + + util().verifyRelPlan(sqlQuery); + } + @Test public void testNestProjectWithMetadata() { String sqlQuery = @@ -341,7 +349,7 @@ public class PushProjectIntoTableSourceScanRuleTest assertThat(DataType.getFieldNames(appliedProjectionDataType.get())).isEmpty(); assertThat(DataType.getFieldNames(appliedMetadataDataType.get())) - .containsExactly("$metadata$m2"); + .containsExactly("metadata"); } @Test @@ -364,7 +372,7 @@ public class PushProjectIntoTableSourceScanRuleTest assertThat(DataType.getFieldNames(appliedProjectionDataType.get())).containsExactly("f1"); assertThat(DataType.getFieldNames(appliedMetadataDataType.get())) - .isEqualTo(Arrays.asList("f1", "$metadata$m2")); + .isEqualTo(Arrays.asList("f1", "metadata")); } // --------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml index 479eb8ad787..41096c54733 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/FileSystemTableSourceTest.xml @@ -50,8 +50,8 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, filem <Resource name="optimized rel plan"> <![CDATA[ Sink(table=[default_catalog.default_database.MySink], fields=[a, b, filemeta]) -+- Calc(select=[a, b, CAST($metadata$file.path AS VARCHAR(2147483647)) AS filemeta]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTableWithMeta, project=[a, b], metadata=[file.path]]], fields=[a, b, $metadata$file.path]) ++- Calc(select=[a, b, CAST(filemeta AS VARCHAR(2147483647)) AS filemeta]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTableWithMeta, project=[a, b], metadata=[file.path]]], fields=[a, b, filemeta]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml index 037458a3857..4c134457412 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml @@ -113,8 +113,8 @@ LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1. </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + deepNested_nested2_num) + $metadata$metadata_1) AS results]) -+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], fields=[id, deepNested_nested1, deepNested_nested2_num, $metadata$metadata_1]) +Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + deepNested_nested2_num) + metadata_1) AS results]) ++- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], fields=[id, deepNested_nested1, deepNested_nested2_num, metadata_1]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out index 53107af6c88..60411d8a3d9 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testWritingMetadata.out @@ -69,7 +69,7 @@ "abilities" : [ { "type" : "WritingMetadata", "metadataKeys" : [ "m" ], - "consumedType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` VARCHAR(2147483647)> NOT NULL" + "consumedType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)> NOT NULL" } ] }, "inputChangelogMode" : [ "INSERT" ], diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out index 81bc950fda9..a8014cf4400 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testReadingMetadata.out @@ -40,11 +40,11 @@ }, { "type" : "ReadingMetadata", "metadataKeys" : [ "m" ], - "producedType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` VARCHAR(2147483647)> NOT NULL" + "producedType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)> NOT NULL" } ] }, - "outputType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable2, project=[a, b], metadata=[m]]], fields=[a, b, $metadata$m])", + "outputType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable2, project=[a, b], metadata=[m]]], fields=[a, b, m])", "inputProperties" : [ ] }, { "id" : 2, @@ -88,8 +88,8 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT, `$metadata$m` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.sink], fields=[a, b, $metadata$m])" + "outputType" : "ROW<`a` BIGINT, `b` INT, `m` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink], fields=[a, b, m])" } ], "edges" : [ { "source" : 1, diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml index 2115b4b73f2..9c99e35123b 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml @@ -312,6 +312,23 @@ LogicalProject(id=[$0], EXPR$1=[ITEM($5, _UTF-16LE'e')]) <![CDATA[ LogicalProject(id=[$0], EXPR$1=[ITEM($1, _UTF-16LE'e')]) +- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, project=[id, testMap], metadata=[]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testProjectWithDuplicateMetadataKey"> + <Resource name="sql"> + <![CDATA[SELECT id, metadata_3, metadata_1 FROM MetadataTable]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(id=[$0], metadata_3=[CAST($2):BIGINT], metadata_1=[$2]) ++- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(id=[$0], metadata_3=[CAST($1):BIGINT], metadata_1=[$1]) ++- LogicalTableScan(table=[[default_catalog, default_database, MetadataTable, project=[id], metadata=[metadata_1]]]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml index b0874d69e86..ae25b30e846 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml @@ -111,8 +111,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], metadata=[$3], computed=[$4]) <Resource name="optimized rel plan"> <![CDATA[ FlinkLogicalCalc(select=[a, b, c, metadata, computed]) -+- FlinkLogicalCalc(select=[a, b, Reinterpret(c) AS c, CAST($metadata$metadata_2 AS BIGINT) AS metadata, +(CAST($metadata$metadata_2 AS BIGINT), b) AS computed]) - +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, CAST(+(CAST($metadata$metadata_2 AS BIGINT), +(CAST($metadata$metadata_2 AS BIGINT), b)) AS INTERVAL SECOND))]]], fields=[a, b, c, $metadata$metadata_2]) ++- FlinkLogicalCalc(select=[a, b, Reinterpret(c) AS c, CAST(metadata AS BIGINT) AS metadata, +(CAST(metadata AS BIGINT), b) AS computed]) + +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-(c, CAST(+(CAST(metadata AS BIGINT), +(CAST(metadata AS BIGINT), b)) AS INTERVAL SECOND))]]], fields=[a, b, c, metadata]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml index abaf03610cc..b8de2fc902e 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml @@ -332,7 +332,7 @@ LogicalProject(EXPR$0=[$2], EXPR$1=[$3], name=[$0], type=[$1]) Calc(select=[EXPR$0, EXPR$1, name, type]) +- HashAggregate(isMerge=[true], groupBy=[name, type], select=[name, type, Final_SUM(sum$0) AS EXPR$0, Final_MAX(max$1) AS EXPR$1]) +- Exchange(distribution=[hash[name, type]]) - +- TableSourceScan(table=[[default_catalog, default_database, inventory_meta, filter=[=(id, 123:BIGINT)], project=[name, type, amount], metadata=[metadata_1], aggregates=[grouping=[name,type], aggFunctions=[LongSumAggFunction(amount),LongMaxAggFunction($metadata$metadata_1)]]]], fields=[name, type, sum$0, max$1]) + +- TableSourceScan(table=[[default_catalog, default_database, inventory_meta, filter=[=(id, 123:BIGINT)], project=[name, type, amount], metadata=[metadata_1], aggregates=[grouping=[name,type], aggFunctions=[LongSumAggFunction(amount),LongMaxAggFunction(metadata_1)]]]], fields=[name, type, sum$0, max$1]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml index 88d4eba4d39..b6fb8db11bd 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml @@ -144,7 +144,7 @@ LogicalProject(a=[$0], b=[$1]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, b]) -+- TableSourceScan(table=[[default_catalog, default_database, MyLtzTable, project=[a, b], metadata=[originTime], watermark=[TO_TIMESTAMP_LTZ($metadata$originTime, 3)]]], fields=[a, b, $metadata$originTime]) ++- TableSourceScan(table=[[default_catalog, default_database, MyLtzTable, project=[a, b], metadata=[originTime], watermark=[TO_TIMESTAMP_LTZ(originTime, 3)]]], fields=[a, b, originTime]) ]]> </Resource> </TestCase> @@ -182,7 +182,7 @@ LogicalProject(a=[$0], b=[$1]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, b]) -+- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[originTime], watermark=[TO_TIMESTAMP(FROM_UNIXTIME(/($metadata$originTime, 1000)), _UTF-16LE'yyyy-MM-dd HH:mm:ss')]]], fields=[a, b, $metadata$originTime]) ++- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[originTime], watermark=[TO_TIMESTAMP(FROM_UNIXTIME(/(originTime, 1000)), _UTF-16LE'yyyy-MM-dd HH:mm:ss')]]], fields=[a, b, originTime]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml index 7c252f7ae31..62c3334a959 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml @@ -192,8 +192,8 @@ LogicalProject(a=[$0], other_metadata=[CAST($4):INTEGER], b=[$1], c=[$2], metada </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[a, CAST($metadata$metadata_3 AS INTEGER) AS other_metadata, b, c, $metadata$metadata_1 AS metadata_1, UPPER($metadata$metadata_1) AS computed]) -+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], fields=[a, b, c, $metadata$metadata_1, $metadata$metadata_3]) +Calc(select=[a, CAST(other_metadata AS INTEGER) AS other_metadata, b, c, metadata_1, UPPER(metadata_1) AS computed]) ++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], fields=[a, b, c, metadata_1, other_metadata]) ]]> </Resource> </TestCase> @@ -209,8 +209,8 @@ LogicalProject(b=[$1], other_metadata=[CAST($4):INTEGER]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[b, CAST($metadata$metadata_3 AS INTEGER) AS other_metadata]) -+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, project=[b], metadata=[metadata_3]]], fields=[b, $metadata$metadata_3]) +Calc(select=[b, CAST(other_metadata AS INTEGER) AS other_metadata]) ++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, project=[b], metadata=[metadata_3]]], fields=[b, other_metadata]) ]]> </Resource> </TestCase> @@ -226,8 +226,8 @@ LogicalProject(timestamp=[$0], metadata_timestamp=[$2], other=[$1], computed_oth </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[timestamp, $metadata$timestamp AS metadata_timestamp, $metadata$other AS other, UPPER($metadata$other) AS computed_other, CAST($metadata$timestamp AS VARCHAR(2147483647)) AS computed_timestamp]) -+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], fields=[timestamp, $metadata$other, $metadata$timestamp]) +Calc(select=[timestamp, metadata_timestamp, other, UPPER(other) AS computed_other, CAST(metadata_timestamp AS VARCHAR(2147483647)) AS computed_timestamp]) ++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable]], fields=[timestamp, other, metadata_timestamp]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml index 41f48c1cc4e..2b8b78770c8 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml @@ -255,6 +255,22 @@ Sink(table=[default_catalog.default_database.sink], fields=[id, city_name]) }]]> </Resource> </TestCase> + <TestCase name="testInsertPartColumn"> + <Resource name="ast"> + <![CDATA[ +LogicalSink(table=[default_catalog.default_database.zm_test], fields=[a, m1, m2, m3, m4]) ++- LogicalProject(a=[CAST($0):BIGINT], m1=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", BIGINT) MAP], m2=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", BIGINT) MAP], m3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", BIGINT) MAP], m4=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", BIGINT) MAP]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Sink(table=[default_catalog.default_database.zm_test], fields=[a, m1, m2, m3, m4], changelogMode=[NONE]) ++- Calc(select=[CAST(a AS BIGINT) AS a, null:(VARCHAR(2147483647), BIGINT) MAP AS m1, null:(VARCHAR(2147483647), BIGINT) MAP AS m2, null:(VARCHAR(2147483647), BIGINT) MAP AS m3, null:(VARCHAR(2147483647), BIGINT) MAP AS m4], changelogMode=[I]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], changelogMode=[I]) +]]> + </Resource> + </TestCase> <TestCase name="testAppendUpsertAndRetractSink"> <Resource name="ast"> <![CDATA[ @@ -489,8 +505,8 @@ LogicalSink(table=[default_catalog.default_database.MetadataTable], fields=[a, b <Resource name="optimized rel plan"> <![CDATA[ Sink(table=[default_catalog.default_database.MetadataTable], fields=[a, b, c, metadata_1, metadata_2]) -+- Calc(select=[a, b, c, $metadata$metadata_1 AS metadata_1, CAST(CAST($metadata$metadata_2 AS INTEGER) AS BIGINT) AS metadata_2]) - +- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, project=[a, b, c], metadata=[metadata_1, metadata_2]]], fields=[a, b, c, $metadata$metadata_1, $metadata$metadata_2]) ++- Calc(select=[a, b, c, metadata_1, CAST(CAST(m_2 AS INTEGER) AS BIGINT) AS metadata_2]) + +- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, project=[a, b, c], metadata=[metadata_1, metadata_2]]], fields=[a, b, c, metadata_1, m_2]) ]]> </Resource> </TestCase> @@ -504,8 +520,8 @@ LogicalSink(table=[default_catalog.default_database.MetadataTable], fields=[meta </Resource> <Resource name="optimized rel plan"> <![CDATA[ -Sink(table=[default_catalog.default_database.MetadataTable], fields=[metadata_1, metadata_2, other, $metadata$metadata_2]) -+- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, project=[metadata_1, metadata_2, other], metadata=[metadata_2]]], fields=[metadata_1, metadata_2, other, $metadata$metadata_2]) +Sink(table=[default_catalog.default_database.MetadataTable], fields=[metadata_1, metadata_2, other, m_2]) ++- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, project=[metadata_1, metadata_2, other], metadata=[metadata_2]]], fields=[metadata_1, metadata_2, other, m_2]) ]]> </Resource> </TestCase> @@ -763,20 +779,4 @@ Sink(table=[default_catalog.default_database.SinkJoinChangeLog], fields=[person, ]]> </Resource> </TestCase> - <TestCase name="testInsertPartColumn"> - <Resource name="ast"> - <![CDATA[ -LogicalSink(table=[default_catalog.default_database.zm_test], fields=[a, m1, m2, m3, m4]) -+- LogicalProject(a=[CAST($0):BIGINT], m1=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", BIGINT) MAP], m2=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", BIGINT) MAP], m3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", BIGINT) MAP], m4=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", BIGINT) MAP]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) -]]> - </Resource> - <Resource name="optimized rel plan"> - <![CDATA[ -Sink(table=[default_catalog.default_database.zm_test], fields=[a, m1, m2, m3, m4], changelogMode=[NONE]) -+- Calc(select=[CAST(a AS BIGINT) AS a, null:(VARCHAR(2147483647), BIGINT) MAP AS m1, null:(VARCHAR(2147483647), BIGINT) MAP AS m2, null:(VARCHAR(2147483647), BIGINT) MAP AS m3, null:(VARCHAR(2147483647), BIGINT) MAP AS m4], changelogMode=[I]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], changelogMode=[I]) -]]> - </Resource> - </TestCase> </Root> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml index 648a40ad94e..f8009ff2197 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml @@ -79,8 +79,8 @@ LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1. </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + deepNested_nested2_num) + $metadata$metadata_1) AS results]) -+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], fields=[id, deepNested_nested1, deepNested_nested2_num, $metadata$metadata_1]) +Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + deepNested_nested2_num) + metadata_1) AS results]) ++- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested_nested1, deepNested_nested2_num], metadata=[metadata_1]]], fields=[id, deepNested_nested1, deepNested_nested2_num, metadata_1]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala index 6cf12b33968..8786641ec6f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala @@ -18,12 +18,15 @@ package org.apache.flink.table.planner.plan.stream.sql import org.apache.flink.api.scala._ +import org.apache.flink.core.testutils.FlinkAssertions +import org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches import org.apache.flink.table.api._ import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.planner.expressions.utils.Func0 import org.apache.flink.table.planner.factories.TestValuesTableFactory.MockedLookupTableSource import org.apache.flink.table.planner.utils.TableTestBase +import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.Test class TableScanTest extends TableTestBase { @@ -172,6 +175,54 @@ class TableScanTest extends TableTestBase { util.verifyExecPlan("SELECT * FROM src WHERE a > 1") } + @Test + def testDDLWithMultipleColumnsFromSameMetadataKey(): Unit = { + assertThatThrownBy(() => util.tableEnv.executeSql(""" + |CREATE TABLE source ( + | a INT METADATA, + | b INT METADATA FROM 'a' + |) WITH ( + | 'connector' = 'COLLECTION' + |) + |""".stripMargin)).satisfies( + FlinkAssertions.anyCauseMatches( + classOf[ValidationException], + "The column `a` and `b` in the table are both from the same metadata key 'a'. " + + "Please specify one of the columns as the metadata column and use the computed column" + + " syntax to specify the others." + )) + } + + @Test + def testDDLWithMultipleColumnsFromSameMetadataKey2(): Unit = { + util.tableEnv.executeSql(""" + |CREATE TABLE source ( + | a INT METADATA + |) WITH ( + | 'connector' = 'COLLECTION' + |) + |""".stripMargin) + assertThatThrownBy( + () => + util.tableEnv.executeSql( + """ + |CREATE TABLE like_source ( + | b INT METADATA FROM 'a' + |) + |WITH ( + | 'connector' = 'COLLECTION' + |) LIKE source ( + | INCLUDING METADATA + |) + |""".stripMargin + )).satisfies( + anyCauseMatches( + "The column `a` and `b` in the table are both from the same metadata key 'a'. " + + "Please specify one of the columns as the metadata column and use the computed column" + + " syntax to specify the others." + )) + } + @Test def testKeywordsWithWatermarkComputedColumn(): Unit = { // Create table with field as atom expression. diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala index 86d3594a002..e19a145125a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSourceITCase.scala @@ -74,6 +74,7 @@ class TableSourceITCase extends StreamingTestBase { |CREATE TABLE MetadataTable ( | `a` INT, | `other_metadata` INT METADATA FROM 'metadata_3', + | `other_metadata2` AS CAST(`other_metadata` AS BIGINT), | `b` BIGINT, | `metadata_1` INT METADATA, | `computed` AS `metadata_1` * 2, @@ -302,6 +303,19 @@ class TableSourceITCase extends StreamingTestBase { assertEquals(expected.sorted, sink.getAppendResults.sorted) } + @Test + def testDuplicateMetadataFromSameKey(): Unit = { + val result = tEnv + .sqlQuery("SELECT other_metadata, other_metadata2, metadata_2 FROM MetadataTable") + .toAppendStream[Row] + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = Seq("1,1,Hallo", "1,1,Hallo Welt wie", "2,2,Hallo Welt") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + @Test def testNestedProjectionWithMetadataAccess(): Unit = { val query =