This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new adaf020 [FLINK-21396][table-common] Improve usability of new schema hierarchy adaf020 is described below commit adaf020d24806b3e765298a613008b728a00c2ce Author: Timo Walther <twal...@apache.org> AuthorDate: Fri Mar 5 10:12:54 2021 +0100 [FLINK-21396][table-common] Improve usability of new schema hierarchy This closes #15096. --- .../flink/table/catalog/SchemaResolutionTest.java | 15 +++- .../java/org/apache/flink/table/api/Schema.java | 60 ++++++++++++++- .../org/apache/flink/table/api/TableSchema.java | 87 ++++++++++++++++++++++ .../org/apache/flink/table/catalog/Column.java | 54 ++++---------- .../apache/flink/table/catalog/ResolvedSchema.java | 2 +- 5 files changed, 172 insertions(+), 46 deletions(-) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java index 3e7c029..7ff5791 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java @@ -95,9 +95,10 @@ public class SchemaResolutionTest { DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("flag", DataTypes.BOOLEAN()))), - Column.metadata("topic", DataTypes.STRING(), true), + Column.metadata("topic", DataTypes.STRING(), null, true), Column.computed("ts", COMPUTED_COLUMN_RESOLVED), - Column.metadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp"), + Column.metadata( + "orig_ts", DataTypes.TIMESTAMP(3), "timestamp", false), Column.computed("proctime", PROCTIME_RESOLVED)), Collections.singletonList(new WatermarkSpec("ts", WATERMARK_RESOLVED)), UniqueConstraint.primaryKey( @@ -291,7 +292,7 @@ public class SchemaResolutionTest { @Test public void testSourceRowDataType() { - final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA, true, true); + final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA); final DataType expectedDataType = DataTypes.ROW( DataTypes.FIELD("id", DataTypes.INT().notNull()), @@ -310,6 +311,14 @@ public class SchemaResolutionTest { assertThat(resolvedSchema.toSourceRowDataType(), equalTo(expectedDataType)); } + @Test + public void testLegacySchemaCompatibility() { + final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA); + final ResolvedSchema resolvedSchemaFromLegacy = + resolveSchema(TableSchema.fromResolvedSchema(resolvedSchema).toSchema()); + assertThat(resolvedSchemaFromLegacy, equalTo(resolvedSchema)); + } + // -------------------------------------------------------------------------------------------- private static void testError(Schema schema, String errorMessage) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java index d749a81..46e9940 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java @@ -302,6 +302,23 @@ public final class Schema { /** * Declares a metadata column that is appended to this schema. * + * <p>See {@link #columnByMetadata(String, AbstractDataType, boolean)} for a detailed + * explanation. + * + * <p>This method uses a type string that can be easily persisted in a durable catalog. + * + * @param columnName column name + * @param serializableTypeString data type of the column + * @param isVirtual whether the column should be persisted or not + */ + public Builder columnByMetadata( + String columnName, String serializableTypeString, boolean isVirtual) { + return columnByMetadata(columnName, DataTypes.of(serializableTypeString), isVirtual); + } + + /** + * Declares a metadata column that is appended to this schema. + * * <p>Metadata columns allow to access connector and/or format specific fields for every row * of a table. For example, a metadata column can be used to read and write the timestamp * from and to Kafka records for time-based operations. The connector and format @@ -332,6 +349,24 @@ public final class Schema { /** * Declares a metadata column that is appended to this schema. * + * <p>See {@link #columnByMetadata(String, AbstractDataType, String)} for a detailed + * explanation. + * + * <p>This method uses a type string that can be easily persisted in a durable catalog. + * + * @param columnName column name + * @param serializableTypeString data type of the column + * @param metadataKey identifying metadata key, if null the column name will be used as + * metadata key + */ + public Builder columnByMetadata( + String columnName, String serializableTypeString, @Nullable String metadataKey) { + return columnByMetadata(columnName, DataTypes.of(serializableTypeString), metadataKey); + } + + /** + * Declares a metadata column that is appended to this schema. + * * <p>Metadata columns allow to access connector and/or format specific fields for every row * of a table. For example, a metadata column can be used to read and write the timestamp * from and to Kafka records for time-based operations. The connector and format @@ -365,6 +400,29 @@ public final class Schema { } /** + * Declares a metadata column that is appended to this schema. + * + * <p>See {@link #columnByMetadata(String, AbstractDataType, String, boolean)} for a + * detailed explanation. + * + * <p>This method uses a type string that can be easily persisted in a durable catalog. + * + * @param columnName column name + * @param serializableTypeString data type of the column + * @param metadataKey identifying metadata key, if null the column name will be used as + * metadata key + * @param isVirtual whether the column should be persisted or not + */ + public Builder columnByMetadata( + String columnName, + String serializableTypeString, + @Nullable String metadataKey, + boolean isVirtual) { + return columnByMetadata( + columnName, DataTypes.of(serializableTypeString), metadataKey, isVirtual); + } + + /** * Declares that the given column should serve as an event-time (i.e. rowtime) attribute and * specifies a corresponding watermark strategy as an expression. * @@ -502,7 +560,7 @@ public final class Schema { columnByMetadata( metadataColumn.getName(), metadataColumn.getDataType(), - metadataColumn.getMetadataAlias().orElse(null), + metadataColumn.getMetadataKey().orElse(null), metadataColumn.isVirtual()); } }); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java index af08a58..412ca5f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java @@ -21,7 +21,12 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.table.api.TableColumn.ComputedColumn; +import org.apache.flink.table.api.TableColumn.MetadataColumn; +import org.apache.flink.table.api.TableColumn.PhysicalColumn; import org.apache.flink.table.api.constraints.UniqueConstraint; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LegacyTypeInformationType; import org.apache.flink.table.types.logical.LogicalType; @@ -298,6 +303,40 @@ public class TableSchema { return Optional.ofNullable(primaryKey); } + /** Helps to migrate to the new {@link Schema} class. */ + public Schema toSchema() { + final Schema.Builder builder = Schema.newBuilder(); + + columns.forEach( + column -> { + if (column instanceof PhysicalColumn) { + final PhysicalColumn c = (PhysicalColumn) column; + builder.column(c.getName(), c.getType()); + } else if (column instanceof MetadataColumn) { + final MetadataColumn c = (MetadataColumn) column; + builder.columnByMetadata( + c.getName(), + c.getType(), + c.getMetadataAlias().orElse(null), + c.isVirtual()); + } else if (column instanceof ComputedColumn) { + final ComputedColumn c = (ComputedColumn) column; + builder.columnByExpression(c.getName(), c.getExpression()); + } else { + throw new IllegalArgumentException("Unsupported column type: " + column); + } + }); + + watermarkSpecs.forEach( + spec -> builder.watermark(spec.getRowtimeAttribute(), spec.getWatermarkExpr())); + + if (primaryKey != null) { + builder.primaryKeyNamed(primaryKey.getName(), primaryKey.getColumns()); + } + + return builder.build(); + } + @Override public String toString() { final StringBuilder sb = new StringBuilder(); @@ -369,6 +408,54 @@ public class TableSchema { } } + /** Helps to migrate to the new {@link ResolvedSchema} to old API methods. */ + public static TableSchema fromResolvedSchema(ResolvedSchema resolvedSchema) { + final TableSchema.Builder builder = TableSchema.builder(); + + resolvedSchema.getColumns().stream() + .map( + column -> { + if (column instanceof Column.PhysicalColumn) { + final Column.PhysicalColumn c = (Column.PhysicalColumn) column; + return TableColumn.physical(c.getName(), c.getDataType()); + } else if (column instanceof Column.MetadataColumn) { + final Column.MetadataColumn c = (Column.MetadataColumn) column; + return TableColumn.metadata( + c.getName(), + c.getDataType(), + c.getMetadataKey().orElse(null), + c.isVirtual()); + } else if (column instanceof Column.ComputedColumn) { + final Column.ComputedColumn c = (Column.ComputedColumn) column; + return TableColumn.computed( + c.getName(), + c.getDataType(), + c.getExpression().asSerializableString()); + } + throw new IllegalArgumentException( + "Unsupported column type: " + column); + }) + .forEach(builder::add); + + resolvedSchema + .getWatermarkSpecs() + .forEach( + spec -> + builder.watermark( + spec.getRowtimeAttribute(), + spec.getWatermarkExpression().asSerializableString(), + spec.getWatermarkExpression().getOutputDataType())); + + resolvedSchema + .getPrimaryKey() + .ifPresent( + pk -> + builder.primaryKey( + pk.getName(), pk.getColumns().toArray(new String[0]))); + + return builder.build(); + } + public static Builder builder() { return new Builder(); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java index 019a9ef..73658c4 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java @@ -65,44 +65,16 @@ public abstract class Column { } /** - * Creates a metadata column from metadata of the given column name. - * - * <p>The column is not virtual by default. - */ - public static MetadataColumn metadata(String name, DataType dataType) { - return metadata(name, dataType, null, false); - } - - /** - * Creates a metadata column from metadata of the given column name. - * - * <p>Allows to specify whether the column is virtual or not. - */ - public static MetadataColumn metadata(String name, DataType type, boolean isVirtual) { - return metadata(name, type, null, isVirtual); - } - - /** - * Creates a metadata column from metadata of the given alias. - * - * <p>The column is not virtual by default. - */ - public static MetadataColumn metadata(String name, DataType type, String metadataAlias) { - Preconditions.checkNotNull(metadataAlias, "Metadata alias can not be null."); - return metadata(name, type, metadataAlias, false); - } - - /** * Creates a metadata column from metadata of the given column name or from metadata of the - * given alias (if not null). + * given key (if not null). * * <p>Allows to specify whether the column is virtual or not. */ public static MetadataColumn metadata( - String name, DataType dataType, @Nullable String metadataAlias, boolean isVirtual) { + String name, DataType dataType, @Nullable String metadataKey, boolean isVirtual) { Preconditions.checkNotNull(name, "Column name can not be null."); Preconditions.checkNotNull(dataType, "Column data type can not be null."); - return new MetadataColumn(name, dataType, metadataAlias, isVirtual); + return new MetadataColumn(name, dataType, metadataKey, isVirtual); } /** @@ -257,14 +229,14 @@ public abstract class Column { /** Representation of a metadata column. */ public static final class MetadataColumn extends Column { - private final @Nullable String metadataAlias; + private final @Nullable String metadataKey; private final boolean isVirtual; private MetadataColumn( - String name, DataType dataType, @Nullable String metadataAlias, boolean isVirtual) { + String name, DataType dataType, @Nullable String metadataKey, boolean isVirtual) { super(name, dataType); - this.metadataAlias = metadataAlias; + this.metadataKey = metadataKey; this.isVirtual = isVirtual; } @@ -272,8 +244,8 @@ public abstract class Column { return isVirtual; } - public Optional<String> getMetadataAlias() { - return Optional.ofNullable(metadataAlias); + public Optional<String> getMetadataKey() { + return Optional.ofNullable(metadataKey); } @Override @@ -290,10 +262,10 @@ public abstract class Column { public Optional<String> explainExtras() { final StringBuilder sb = new StringBuilder(); sb.append("METADATA"); - if (metadataAlias != null) { + if (metadataKey != null) { sb.append(" FROM "); sb.append("'"); - sb.append(EncodingUtils.escapeSingleQuotes(metadataAlias)); + sb.append(EncodingUtils.escapeSingleQuotes(metadataKey)); sb.append("'"); } if (isVirtual) { @@ -304,7 +276,7 @@ public abstract class Column { @Override public Column copy(DataType newDataType) { - return new MetadataColumn(name, newDataType, metadataAlias, isVirtual); + return new MetadataColumn(name, newDataType, metadataKey, isVirtual); } @Override @@ -319,12 +291,12 @@ public abstract class Column { return false; } MetadataColumn that = (MetadataColumn) o; - return isVirtual == that.isVirtual && Objects.equals(metadataAlias, that.metadataAlias); + return isVirtual == that.isVirtual && Objects.equals(metadataKey, that.metadataKey); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), metadataAlias, isVirtual); + return Objects.hash(super.hashCode(), metadataKey, isVirtual); } } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java index 1a1ec01..417bfa6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java @@ -57,7 +57,7 @@ public final class ResolvedSchema { private final List<WatermarkSpec> watermarkSpecs; private final @Nullable UniqueConstraint primaryKey; - ResolvedSchema( + public ResolvedSchema( List<Column> columns, List<WatermarkSpec> watermarkSpecs, @Nullable UniqueConstraint primaryKey) {