wuchong commented on a change in pull request #14996: URL: https://github.com/apache/flink/pull/14996#discussion_r582702862
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java ########## @@ -0,0 +1,856 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.Column.ComputedColumn; +import org.apache.flink.table.catalog.Column.MetadataColumn; +import org.apache.flink.table.catalog.Column.PhysicalColumn; +import org.apache.flink.table.catalog.Constraint; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.SchemaResolver; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.WatermarkSpec; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.SqlCallExpression; +import org.apache.flink.table.types.AbstractDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot; + +/** + * Schema of a table or view. + * + * <p>A schema represents the schema part of a {@code CREATE TABLE (schema) WITH (options)} DDL + * statement in SQL. It defines columns of different kind, constraints, time attributes, and + * watermark strategies. It is possible to reference objects (such as functions or types) across + * different catalogs. + * + * <p>This class is used in the API and catalogs to define an unresolved schema that will be + * translated to {@link ResolvedSchema}. Some methods of this class perform basic validation, + * however, the main validation happens during the resolution. + * + * <p>Since an instance of this class is unresolved, it should not be directly persisted. The {@link + * #toString()} shows only a summary of the contained objects. + */ +@PublicEvolving +public final class Schema { + + private final List<UnresolvedColumn> columns; + + private final List<UnresolvedWatermarkSpec> watermarkSpecs; + + private final @Nullable UnresolvedPrimaryKey primaryKey; + + private Schema( + List<UnresolvedColumn> columns, + List<UnresolvedWatermarkSpec> watermarkSpecs, + @Nullable UnresolvedPrimaryKey primaryKey) { + this.columns = columns; + this.watermarkSpecs = watermarkSpecs; + this.primaryKey = primaryKey; + } + + /** Builder for configuring and creating instances of {@link Schema}. */ + public static Schema.Builder newBuilder() { + return new Builder(); + } + + public List<UnresolvedColumn> getColumns() { + return columns; + } + + public List<UnresolvedWatermarkSpec> getWatermarkSpecs() { + return watermarkSpecs; + } + + public Optional<UnresolvedPrimaryKey> getPrimaryKey() { + return Optional.ofNullable(primaryKey); + } + + /** Resolves the given {@link Schema} to a validated {@link ResolvedSchema}. */ + public ResolvedSchema resolve(SchemaResolver resolver) { + return resolver.resolve(this); + } + + @Override + public String toString() { + final List<Object> components = new ArrayList<>(); + components.addAll(columns); + components.addAll(watermarkSpecs); + if (primaryKey != null) { + components.add(primaryKey); + } + return components.stream() + .map(Objects::toString) + .map(s -> " " + s) + .collect(Collectors.joining(", \n", "(\n", "\n)")); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Schema schema = (Schema) o; + return columns.equals(schema.columns) + && watermarkSpecs.equals(schema.watermarkSpecs) + && Objects.equals(primaryKey, schema.primaryKey); + } + + @Override + public int hashCode() { + return Objects.hash(columns, watermarkSpecs, primaryKey); + } + + // -------------------------------------------------------------------------------------------- + + /** A builder for constructing an immutable but still unresolved {@link Schema}. */ + public static final class Builder { + + private final List<UnresolvedColumn> columns; + + private final List<UnresolvedWatermarkSpec> watermarkSpecs; + + private @Nullable UnresolvedPrimaryKey primaryKey; + + private Builder() { + columns = new ArrayList<>(); + watermarkSpecs = new ArrayList<>(); + } + + /** Adopts all members from the given unresolved schema. */ + public Builder fromSchema(Schema unresolvedSchema) { + columns.addAll(unresolvedSchema.columns); + watermarkSpecs.addAll(unresolvedSchema.watermarkSpecs); + if (unresolvedSchema.primaryKey != null) { + primaryKeyNamed( + unresolvedSchema.primaryKey.getConstraintName(), + unresolvedSchema.primaryKey.getColumnNames()); + } + return this; + } + + /** Adopts all members from the given resolved schema. */ + public Builder fromResolvedSchema(ResolvedSchema resolvedSchema) { + addResolvedColumns(resolvedSchema.getColumns()); + addResolvedWatermarkSpec(resolvedSchema.getWatermarkSpecs()); + resolvedSchema.getPrimaryKey().ifPresent(this::addResolvedConstraint); + return this; + } + + /** Adopts all fields of the given row as physical columns of the schema. */ + public Builder fromRowDataType(DataType dataType) { + Preconditions.checkNotNull(dataType, "Data type must not be null."); + Preconditions.checkArgument( + hasRoot(dataType.getLogicalType(), LogicalTypeRoot.ROW), + "Data type of ROW expected."); + final List<DataType> fieldDataTypes = dataType.getChildren(); + final List<String> fieldNames = ((RowType) dataType.getLogicalType()).getFieldNames(); + IntStream.range(0, fieldDataTypes.size()) + .forEach(i -> column(fieldNames.get(i), fieldDataTypes.get(i))); + return this; + } + + /** + * Declares a physical column that is appended to this schema. + * + * <p>Physical columns are regular columns known from databases. They define the names, the + * types, and the order of fields in the physical data. Thus, physical columns represent the + * payload that is read from and written to an external system. Connectors and formats use + * these columns (in the defined order) to configure themselves. Other kinds of columns can + * be declared between physical columns but will not influence the final physical schema. + * + * @param columnName column name + * @param dataType data type of the column + */ + public Builder column(String columnName, AbstractDataType<?> dataType) { + Preconditions.checkNotNull(columnName, "Column name must not be null."); + Preconditions.checkNotNull(dataType, "Data type must not be null."); + columns.add(new UnresolvedPhysicalColumn(columnName, dataType)); + return this; + } + + /** + * Declares a physical column that is appended to this schema. + * + * <p>See {@link #column(String, AbstractDataType)} for a detailed explanation. + * + * <p>This method uses a type string that can be easily persisted in a durable catalog. + * + * @param columnName column name + * @param serializableTypeString data type of the column as a serializable string + * @see LogicalType#asSerializableString() + */ + public Builder column(String columnName, String serializableTypeString) { + return column(columnName, DataTypes.of(serializableTypeString)); + } + + /** + * Declares a computed column that is appended to this schema. + * + * <p>Computed columns are virtual columns that are generated by evaluating an expression + * that can reference other columns declared in the same table. Both physical columns and + * metadata columns can be accessed if they precede the computed column in the schema + * declaration. The column itself is not physically stored within the table. The column’s + * data type is derived automatically from the given expression and does not have to be + * declared manually. + * + * <p>Computed columns are commonly used for defining time attributes. For example, the + * computed column can be used if the original field is not TIMESTAMP(3) type or is nested + * in a JSON string. + * + * <p>Any scalar expression can be used for in-memory/temporary tables. However, currently, + * only SQL expressions can be persisted in a catalog. User-defined functions (also defined + * in different catalogs) are supported. + * + * <p>Example: {@code .columnByExpression("ts", $("json_obj").get("ts").cast(TIMESTAMP(3))} + * + * @param columnName column name + * @param expression computation of the column + */ + public Builder columnByExpression(String columnName, Expression expression) { + Preconditions.checkNotNull(columnName, "Column name must not be null."); + Preconditions.checkNotNull(expression, "Expression must not be null."); + columns.add(new UnresolvedComputedColumn(columnName, expression)); + return this; + } + + /** + * Declares a computed column that is appended to this schema. + * + * <p>See {@link #columnByExpression(String, Expression)} for a detailed explanation. + * + * <p>This method uses a SQL expression that can be easily persisted in a durable catalog. + * + * <p>Example: {@code .columnByExpression("ts", "CAST(json_obj.ts AS TIMESTAMP(3))")} + * + * @param columnName column name + * @param sqlExpression computation of the column using SQL + */ + public Builder columnByExpression(String columnName, String sqlExpression) { + return columnByExpression(columnName, new SqlCallExpression(sqlExpression)); + } + + /** + * Declares a metadata column that is appended to this schema. + * + * <p>Metadata columns allow to access connector and/or format specific fields for every row + * of a table. For example, a metadata column can be used to read and write the timestamp + * from and to Kafka records for time-based operations. The connector and format + * documentation lists the available metadata fields for every component. + * + * <p>Every metadata field is identified by a string-based key and has a documented data + * type. For convenience, the runtime will perform an explicit cast if the data type of the + * column differs from the data type of the metadata field. Of course, this requires that + * the two data types are compatible. + * + * <p>By default, a metadata column can be used for both reading and writing. However, in + * many cases an external system provides more read-only metadata fields than writable + * fields. Therefore, it is possible to exclude metadata columns from persisting by setting + * the {@code isVirtual} flag to {@code true}. + * + * <p>Note: This method assumes that the metadata key is equal to the column name. + * + * @param columnName column name + * @param dataType data type of the column + * @param isVirtual whether the column should be persisted or not + */ + public Builder columnByMetadata( + String columnName, AbstractDataType<?> dataType, boolean isVirtual) { + Preconditions.checkNotNull(columnName, "Column name must not be null."); + Preconditions.checkNotNull(dataType, "Data type must not be null."); + columns.add(new UnresolvedMetadataColumn(columnName, dataType, null, isVirtual)); + return this; + } + + /** + * Declares a metadata column that is appended to this schema. + * + * <p>Metadata columns allow to access connector and/or format specific fields for every row + * of a table. For example, a metadata column can be used to read and write the timestamp + * from and to Kafka records for time-based operations. The connector and format + * documentation lists the available metadata fields for every component. + * + * <p>Every metadata field is identified by a string-based key and has a documented data + * type. The metadata key can be omitted if the column name should be used as the + * identifying metadata key. For convenience, the runtime will perform an explicit cast if + * the data type of the column differs from the data type of the metadata field. Of course, + * this requires that the two data types are compatible. + * + * <p>Note: This method assumes that a metadata column can be used for both reading and + * writing. + * + * @param columnName column name + * @param dataType data type of the column + * @param metadataKey identifying metadata key, if null the column name will be used as + * metadata key + */ + public Builder columnByMetadata( + String columnName, AbstractDataType<?> dataType, @Nullable String metadataKey) { + Preconditions.checkNotNull(columnName, "Column name must not be null."); + Preconditions.checkNotNull(dataType, "Data type must not be null."); + columns.add(new UnresolvedMetadataColumn(columnName, dataType, metadataKey, false)); + return this; + } + + /** + * Declares a metadata column that is appended to this schema. + * + * <p>Metadata columns allow to access connector and/or format specific fields for every row + * of a table. For example, a metadata column can be used to read and write the timestamp + * from and to Kafka records for time-based operations. The connector and format + * documentation lists the available metadata fields for every component. + * + * <p>Every metadata field is identified by a string-based key and has a documented data + * type. The metadata key can be omitted if the column name should be used as the + * identifying metadata key. For convenience, the runtime will perform an explicit cast if + * the data type of the column differs from the data type of the metadata field. Of course, + * this requires that the two data types are compatible. + * + * <p>By default, a metadata column can be used for both reading and writing. However, in + * many cases an external system provides more read-only metadata fields than writable + * fields. Therefore, it is possible to exclude metadata columns from persisting by setting + * the {@code isVirtual} flag to {@code true}. + * + * @param columnName column name + * @param dataType data type of the column + * @param metadataKey identifying metadata key, if null the column name will be used as + * metadata key + * @param isVirtual whether the column should be persisted or not + */ + public Builder columnByMetadata( + String columnName, + AbstractDataType<?> dataType, + @Nullable String metadataKey, + boolean isVirtual) { + Preconditions.checkNotNull(columnName, "Column name must not be null."); + columns.add(new UnresolvedMetadataColumn(columnName, dataType, metadataKey, isVirtual)); + return this; + } + + /** + * Declares that the given column should serve as an event-time (i.e. rowtime) attribute and + * specifies a corresponding watermark strategy as an expression. + * + * <p>The column must be of type {@code TIMESTAMP(3)} and be a top-level column in the + * schema. It may be a computed column. + * + * <p>The watermark generation expression is evaluated by the framework for every record + * during runtime. The framework will periodically emit the largest generated watermark. If + * the current watermark is still identical to the previous one, or is null, or the value of + * the returned watermark is smaller than that of the last emitted one, then no new + * watermark will be emitted. A watermark is emitted in an interval defined by the + * configuration. + * + * <p>Any scalar expression can be used for declaring a watermark strategy for + * in-memory/temporary tables. However, currently, only SQL expressions can be persisted in + * a catalog. The expression's return data type must be {@code TIMESTAMP(3)}. User-defined + * functions (also defined in different catalogs) are supported. + * + * <p>Example: {@code .watermark("ts", $("ts).minus(lit(5).seconds())} + * + * @param columnName the column name used as a rowtime attribute + * @param watermarkExpression the expression used for watermark generation + */ + public Builder watermark(String columnName, Expression watermarkExpression) { + Preconditions.checkNotNull(columnName, "Column name must not be null."); + Preconditions.checkNotNull( + watermarkExpression, "Watermark expression must not be null."); + this.watermarkSpecs.add(new UnresolvedWatermarkSpec(columnName, watermarkExpression)); + return this; + } + + /** + * Declares that the given column should serve as an event-time (i.e. rowtime) attribute and + * specifies a corresponding watermark strategy as an expression. + * + * <p>See {@link #watermark(String, Expression)} for a detailed explanation. + * + * <p>This method uses a SQL expression that can be easily persisted in a durable catalog. + * + * <p>Example: {@code .watermark("ts", "ts - INTERVAL '5' SECOND")} + */ + public Builder watermark(String columnName, String sqlExpression) { + return watermark(columnName, new SqlCallExpression(sqlExpression)); + } + + /** + * Declares a primary key constraint for a set of given columns. Primary key uniquely + * identify a row in a table. Neither of columns in a primary can be nullable. The primary + * key is informational only. It will not be enforced. It can be used for optimizations. It + * is the data owner's responsibility to ensure uniqueness of the data. + * + * <p>The primary key will be assigned a random name. + * + * @param columnNames columns that form a unique primary key + */ + public Builder primaryKey(String... columnNames) { + Preconditions.checkNotNull(columnNames, "Primary key column names must not be null."); + return primaryKey(Arrays.asList(columnNames)); + } + + /** + * Declares a primary key constraint for a set of given columns. Primary key uniquely + * identify a row in a table. Neither of columns in a primary can be nullable. The primary + * key is informational only. It will not be enforced. It can be used for optimizations. It + * is the data owner's responsibility to ensure uniqueness of the data. + * + * <p>The primary key will be assigned a random name. + * + * @param columnNames columns that form a unique primary key + */ + public Builder primaryKey(List<String> columnNames) { + return primaryKeyNamed(UUID.randomUUID().toString(), columnNames); Review comment: We have an issue FLINK-16079 which wants to have a deterministic primary key name. What do you think about generate the primary key name by concat the field names, e.g. `PK_f0_f2`. This is also how other databases do. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/UniqueConstraint.java ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.utils.EncodingUtils; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A unique key constraint. It can be declared also as a PRIMARY KEY. + * + * @see ConstraintType + */ +@PublicEvolving +public final class UniqueConstraint extends AbstractConstraint { Review comment: I just want to make sure will we deprecate the `org.apache.flink.table.api.constraints.UniqueConstraint` in this FLIP? ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Schema.UnresolvedComputedColumn; +import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn; +import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn; +import org.apache.flink.table.api.Schema.UnresolvedPrimaryKey; +import org.apache.flink.table.api.Schema.UnresolvedWatermarkSpec; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Column.ComputedColumn; +import org.apache.flink.table.catalog.Column.MetadataColumn; +import org.apache.flink.table.catalog.Column.PhysicalColumn; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.LocalReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.localRef; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute; +import static org.apache.flink.table.types.utils.DataTypeUtils.replaceLogicalType; + +/** Default implementation of {@link SchemaResolver}. */ +@Internal +class DefaultSchemaResolver implements SchemaResolver { + + private final boolean isStreamingMode; + private final boolean supportsMetadata; + private final DataTypeFactory dataTypeFactory; + private final ExpressionResolverBuilder resolverBuilder; + + DefaultSchemaResolver( + boolean isStreamingMode, + boolean supportsMetadata, + DataTypeFactory dataTypeFactory, + ExpressionResolverBuilder resolverBuilder) { + this.isStreamingMode = isStreamingMode; + this.supportsMetadata = supportsMetadata; + this.dataTypeFactory = dataTypeFactory; + this.resolverBuilder = resolverBuilder; + } + + public SchemaResolver withMetadata(boolean supportsMetadata) { + return new DefaultSchemaResolver( + isStreamingMode, supportsMetadata, dataTypeFactory, resolverBuilder); + } + + @Override + public ResolvedSchema resolve(Schema schema) { + final List<Column> columns = resolveColumns(schema.getColumns()); + + final List<WatermarkSpec> watermarkSpecs = + resolveWatermarkSpecs(schema.getWatermarkSpecs(), columns); + + final List<Column> columnsWithRowtime = adjustRowtimeAttributes(watermarkSpecs, columns); + + final UniqueConstraint primaryKey = + resolvePrimaryKey(schema.getPrimaryKey().orElse(null), columnsWithRowtime); + + return new ResolvedSchema(columnsWithRowtime, watermarkSpecs, primaryKey); + } + + @Override + public boolean isStreamingMode() { + return isStreamingMode; + } + + @Override + public boolean supportsMetadata() { + return supportsMetadata; + } + + // -------------------------------------------------------------------------------------------- + + private List<Column> resolveColumns(List<Schema.UnresolvedColumn> unresolvedColumns) { + final List<Column> resolvedColumns = new ArrayList<>(); + for (Schema.UnresolvedColumn unresolvedColumn : unresolvedColumns) { + final Column column; + if (unresolvedColumn instanceof UnresolvedPhysicalColumn) { + column = resolvePhysicalColumn((UnresolvedPhysicalColumn) unresolvedColumn); + } else if (unresolvedColumn instanceof UnresolvedMetadataColumn) { + column = resolveMetadataColumn((UnresolvedMetadataColumn) unresolvedColumn); + } else if (unresolvedColumn instanceof UnresolvedComputedColumn) { + column = + resolveComputedColumn( + (UnresolvedComputedColumn) unresolvedColumn, resolvedColumns); + } else { + throw new IllegalArgumentException("Unknown unresolved column type."); + } + resolvedColumns.add(column); + } + + validateDuplicateColumns(resolvedColumns); + + return resolvedColumns; + } + + private PhysicalColumn resolvePhysicalColumn(UnresolvedPhysicalColumn unresolvedColumn) { + return Column.physical( + unresolvedColumn.getColumnName(), + dataTypeFactory.createDataType(unresolvedColumn.getDataType())); + } + + private MetadataColumn resolveMetadataColumn(UnresolvedMetadataColumn unresolvedColumn) { + if (!supportsMetadata) { + throw new ValidationException( + "Metadata columns are not supported in a schema at the current location."); + } + return Column.metadata( + unresolvedColumn.getColumnName(), + dataTypeFactory.createDataType(unresolvedColumn.getDataType()), + unresolvedColumn.getMetadataKey(), + unresolvedColumn.isVirtual()); + } + + private ComputedColumn resolveComputedColumn( + UnresolvedComputedColumn unresolvedColumn, List<Column> inputColumns) { + final ResolvedExpression resolvedExpression; + try { + resolvedExpression = resolveExpression(inputColumns, unresolvedColumn.getExpression()); + } catch (Exception e) { + throw new ValidationException( + String.format( + "Invalid expression for computed column '%s'.", + unresolvedColumn.getColumnName()), + e); + } + return Column.computed(unresolvedColumn.getColumnName(), resolvedExpression); + } + + private void validateDuplicateColumns(List<Column> columns) { + final List<String> names = + columns.stream().map(Column::getName).collect(Collectors.toList()); + final List<String> duplicates = + names.stream() + .filter(name -> Collections.frequency(names, name) > 1) + .distinct() + .collect(Collectors.toList()); + if (duplicates.size() > 0) { + throw new ValidationException( + String.format( + "Schema must not contain duplicate column names. Found duplicates: %s", + duplicates)); + } + } + + private List<WatermarkSpec> resolveWatermarkSpecs( + List<UnresolvedWatermarkSpec> unresolvedWatermarkSpecs, List<Column> inputColumns) { + if (unresolvedWatermarkSpecs.size() == 0) { + return Collections.emptyList(); + } + if (unresolvedWatermarkSpecs.size() > 1) { + throw new ValidationException("Multiple watermark definitions are not supported yet."); + } + final UnresolvedWatermarkSpec watermarkSpec = unresolvedWatermarkSpecs.get(0); + + // validate time attribute + final String timeColumn = watermarkSpec.getColumnName(); + validateTimeColumn(timeColumn, inputColumns); + + // resolve watermark expression + final ResolvedExpression watermarkExpression; + try { + watermarkExpression = + resolveExpression(inputColumns, watermarkSpec.getWatermarkExpression()); + } catch (Exception e) { + throw new ValidationException( + String.format( + "Invalid expression for watermark '%s'.", watermarkSpec.toString()), + e); + } + validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType()); + + return Collections.singletonList( + new WatermarkSpec(watermarkSpec.getColumnName(), watermarkExpression)); + } + + private void validateTimeColumn(String columnName, List<Column> columns) { + final Optional<Column> timeColumn = + columns.stream().filter(c -> c.getName().equals(columnName)).findFirst(); + if (!timeColumn.isPresent()) { + throw new ValidationException( + String.format( + "Invalid column name '%s' for rowtime attribute in watermark declaration. Available columns are: %s", + columnName, + columns.stream().map(Column::getName).collect(Collectors.toList()))); + } + final LogicalType timeFieldType = timeColumn.get().getDataType().getLogicalType(); + if (!hasRoot(timeFieldType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) + || getPrecision(timeFieldType) != 3) { + throw new ValidationException( + "Invalid data type of time field for watermark definition. " + + "The field must be of type TIMESTAMP(3) WITHOUT TIME ZONE."); + } + if (isProctimeAttribute(timeFieldType)) { + throw new ValidationException( + "A watermark can not be defined for a processing-time attribute."); + } + } + + private void validateWatermarkExpression(LogicalType watermarkType) { + if (!hasRoot(watermarkType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) + || getPrecision(watermarkType) != 3) { + throw new ValidationException( + "Invalid data type of expression for watermark definition. " + + "The field must be of type TIMESTAMP(3) WITHOUT TIME ZONE."); + } + } + + /** Updates the data type of columns that are referenced by {@link WatermarkSpec}. */ + private List<Column> adjustRowtimeAttributes( + List<WatermarkSpec> watermarkSpecs, List<Column> columns) { + return columns.stream() + .map( + column -> { + final String name = column.getName(); + final DataType dataType = column.getDataType(); + final boolean hasWatermarkSpec = + watermarkSpecs.stream() + .anyMatch(s -> s.getRowtimeAttribute().equals(name)); + if (hasWatermarkSpec && isStreamingMode) { + final TimestampType originalType = + (TimestampType) dataType.getLogicalType(); + final LogicalType rowtimeType = + new TimestampType( + originalType.isNullable(), + TimestampKind.ROWTIME, + originalType.getPrecision()); + return column.copy(replaceLogicalType(dataType, rowtimeType)); + } + return column; + }) + .collect(Collectors.toList()); + } + + private @Nullable UniqueConstraint resolvePrimaryKey( + @Nullable UnresolvedPrimaryKey unresolvedPrimaryKey, List<Column> columns) { + if (unresolvedPrimaryKey == null) { + return null; + } + + final UniqueConstraint primaryKey = + UniqueConstraint.primaryKey( + unresolvedPrimaryKey.getConstraintName(), + unresolvedPrimaryKey.getColumnNames()); + + validatePrimaryKey(primaryKey, columns); + + return primaryKey; + } + + private void validatePrimaryKey(UniqueConstraint primaryKey, List<Column> columns) { + final Map<String, Column> columnsByNameLookup = + columns.stream().collect(Collectors.toMap(Column::getName, Function.identity())); + + for (String columnName : primaryKey.getColumns()) { Review comment: We should also check the column names are not duplicated. ########## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java ########## @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.resolver.ExpressionResolver; +import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.utils.DataTypeFactoryMock; +import org.apache.flink.table.utils.FunctionLookupMock; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.flink.table.api.Expressions.callSql; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute; +import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for {@link Schema}, {@link DefaultSchemaResolver}, and {@link ResolvedSchema}. */ +public class SchemaResolutionTest { + + private static final String COMPUTED_COLUMN_SQL = "orig_ts - INTERVAL '60' MINUTE"; + + private static final ResolvedExpression COMPUTED_COLUMN_RESOLVED = + new ResolvedExpressionMock(DataTypes.TIMESTAMP(3), () -> COMPUTED_COLUMN_SQL); + + private static final String WATERMARK_SQL = "ts - INTERVAL '5' SECOND"; + + private static final ResolvedExpression WATERMARK_RESOLVED = + new ResolvedExpressionMock(DataTypes.TIMESTAMP(3), () -> WATERMARK_SQL); + + private static final String PROCTIME_SQL = "PROCTIME()"; + + private static final ResolvedExpression PROCTIME_RESOLVED = + new ResolvedExpressionMock( + fromLogicalToDataType(new TimestampType(false, TimestampKind.PROCTIME, 3)), + () -> PROCTIME_SQL); + + private static final Schema SCHEMA = + Schema.newBuilder() + .primaryKeyNamed("primary_constraint", "id") // out of order + .column("id", DataTypes.INT().notNull()) + .column("counter", DataTypes.INT().notNull()) + .column("payload", "ROW<name STRING, age INT, flag BOOLEAN>") + .columnByMetadata("topic", DataTypes.STRING(), true) + .columnByMetadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp") + .columnByExpression("ts", callSql(COMPUTED_COLUMN_SQL)) // API expression Review comment: Could you put the computed column `ts` before `orig_ts`? I checked the implementation of `DefaultSchemaResolver#resolveColumns` and thought it doesn't take this case into account. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
