This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 16c2e46 [FLINK-22426][table] Fix several shortcomings that prevent schema expressions 16c2e46 is described below commit 16c2e467b66d6d1c7b8b847e923c6823056181a2 Author: Timo Walther <twal...@apache.org> AuthorDate: Fri Apr 23 14:09:13 2021 +0200 [FLINK-22426][table] Fix several shortcomings that prevent schema expressions This fixes a couple of critical bugs in the stack that prevented Table API expressions to be used for schema declaration. Due to time constraints, this PR could not make it into the 1.13 release but should be added to the next bugfix release for a smoother user experience. For testing and consistency, it exposes the Table API expression sourceWatermark() in Java, Scala, and Python API. This closes #15798 --- flink-python/pyflink/table/expressions.py | 14 ++++ .../org/apache/flink/table/api/Expressions.java | 16 ++++ .../table/catalog/QueryOperationCatalogView.java | 53 ++++++++----- .../resolver/rules/ResolveCallByArgumentsRule.java | 33 ++++---- .../flink/table/catalog/SchemaResolutionTest.java | 89 +++++++++------------ .../table/api/ImplicitExpressionConversions.scala | 27 +++++-- .../table/types/inference/TypeInferenceUtil.java | 81 ++++++++----------- .../inference/InputTypeStrategiesTestBase.java | 2 +- .../table/planner/catalog/CatalogSchemaTable.java | 56 ++++--------- .../table/planner/connectors/DynamicSinkUtils.java | 91 ++++++++++------------ .../planner/connectors/DynamicSourceUtils.java | 12 +-- .../expressions/converter/ExpressionConverter.java | 15 +++- .../plan/nodes/exec/common/CommonExecSink.java | 50 ++++++++---- .../flink/table/planner/utils/ShortcutUtils.java | 8 ++ .../table/planner/calcite/FlinkTypeFactory.scala | 16 ++++ .../table/planner/delegation/PlannerBase.scala | 10 +-- .../plan/schema/LegacyCatalogSourceTable.scala | 2 +- .../flink/table/planner/sinks/TableSinkUtils.scala | 11 --- .../runtime/stream/sql/DataStreamJavaITCase.java | 17 ++-- 19 files changed, 331 insertions(+), 272 deletions(-) diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py index 179fac4..0b3a60b 100644 --- a/flink-python/pyflink/table/expressions.py +++ b/flink-python/pyflink/table/expressions.py @@ -477,6 +477,20 @@ def log(v, base=None) -> Expression[float]: return _binary_op("log", base, v) +def source_watermark() -> Expression: + """ + Source watermark declaration for schema. + + This is a marker function that doesn't have concrete runtime implementation. It can only + be used as a single expression for watermark strategies in schema declarations. The declaration + will be pushed down into a table source that implements the `SupportsSourceWatermark` + interface. The source will emit system-defined watermarks afterwards. + + Please check the documentation whether the connector supports source watermarks. + """ + return _leaf_op("sourceWatermark") + + def if_then_else(condition: Union[bool, Expression[bool]], if_true, if_false) -> Expression: """ Ternary conditional operator that decides which of two other expressions should be evaluated diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java index 0444287..b88be0f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java @@ -20,6 +20,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark; import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ResolvedExpression; @@ -465,6 +466,21 @@ public final class Expressions { } /** + * Source watermark declaration for {@link Schema}. + * + * <p>This is a marker function that doesn't have concrete runtime implementation. It can only + * be used as a single expression in {@link Schema.Builder#watermark(String, Expression)}. The + * declaration will be pushed down into a table source that implements the {@link + * SupportsSourceWatermark} interface. The source will emit system-defined watermarks + * afterwards. + * + * <p>Please check the documentation whether the connector supports source watermarks. + */ + public static ApiExpression sourceWatermark() { + return apiCall(BuiltInFunctionDefinitions.SOURCE_WATERMARK); + } + + /** * Ternary conditional operator that decides which of two other expressions should be evaluated * based on a evaluated boolean condition. * diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java index 5559813..dc4f0a7 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java @@ -19,31 +19,21 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.operations.QueryOperation; -import java.util.HashMap; +import java.util.Map; import java.util.Optional; -/** - * A view created from a {@link QueryOperation} via operations on {@link - * org.apache.flink.table.api.Table}. - */ +/** A view created from a {@link QueryOperation} via operations on {@link Table}. */ @Internal -public class QueryOperationCatalogView extends AbstractCatalogView { +public final class QueryOperationCatalogView implements CatalogView { + private final QueryOperation queryOperation; public QueryOperationCatalogView(QueryOperation queryOperation) { - this(queryOperation, ""); - } - - public QueryOperationCatalogView(QueryOperation queryOperation, String comment) { - super( - queryOperation.asSummaryString(), - queryOperation.asSummaryString(), - TableSchema.fromResolvedSchema(queryOperation.getResolvedSchema()), - new HashMap<>(), - comment); this.queryOperation = queryOperation; } @@ -52,8 +42,23 @@ public class QueryOperationCatalogView extends AbstractCatalogView { } @Override + public Schema getUnresolvedSchema() { + return Schema.newBuilder().fromResolvedSchema(queryOperation.getResolvedSchema()).build(); + } + + @Override + public Map<String, String> getOptions() { + throw new TableException("A view backed by a query operation has no options."); + } + + @Override + public String getComment() { + return queryOperation.asSummaryString(); + } + + @Override public QueryOperationCatalogView copy() { - return new QueryOperationCatalogView(this.queryOperation, getComment()); + return new QueryOperationCatalogView(queryOperation); } @Override @@ -65,4 +70,16 @@ public class QueryOperationCatalogView extends AbstractCatalogView { public Optional<String> getDetailedDescription() { return getDescription(); } + + @Override + public String getOriginalQuery() { + throw new TableException( + "A view backed by a query operation has no serializable representation."); + } + + @Override + public String getExpandedQuery() { + throw new TableException( + "A view backed by a query operation has no serializable representation."); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java index d29e31e..9a7bca6 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java @@ -81,8 +81,11 @@ final class ResolveCallByArgumentsRule implements ResolverRule { @Override public List<Expression> apply(List<Expression> expression, ResolutionContext context) { + // only the top-level expressions may access the output data type + final SurroundingInfo surroundingInfo = + context.getOutputDataType().map(SurroundingInfo::of).orElse(null); return expression.stream() - .flatMap(expr -> expr.accept(new ResolvingCallVisitor(context, null)).stream()) + .flatMap(e -> e.accept(new ResolvingCallVisitor(context, surroundingInfo)).stream()) .collect(Collectors.toList()); } @@ -120,23 +123,23 @@ final class ResolveCallByArgumentsRule implements ResolverRule { // resolve the children with information from the current call final List<ResolvedExpression> resolvedArgs = new ArrayList<>(); final int argCount = unresolvedCall.getChildren().size(); + for (int i = 0; i < argCount; i++) { final int currentPos = i; + final SurroundingInfo surroundingInfo = + typeInference + .map( + inference -> + SurroundingInfo.of( + name, + definition, + inference, + argCount, + currentPos, + resolutionContext.isGroupedAggregation())) + .orElse(null); final ResolvingCallVisitor childResolver = - new ResolvingCallVisitor( - resolutionContext, - typeInference - .map( - inference -> - new SurroundingInfo( - name, - definition, - inference, - argCount, - currentPos, - resolutionContext - .isGroupedAggregation())) - .orElse(null)); + new ResolvingCallVisitor(resolutionContext, surroundingInfo); resolvedArgs.addAll(unresolvedCall.getChildren().get(i).accept(childResolver)); } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java index 6e0edd6..316c24a 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java @@ -21,8 +21,11 @@ package org.apache.flink.table.catalog; import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; +import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionIdentifier; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; @@ -39,6 +42,7 @@ import java.util.Arrays; import java.util.Collections; import static org.apache.flink.table.api.Expressions.callSql; +import static org.apache.flink.table.api.Expressions.sourceWatermark; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isTimeAttribute; @@ -90,23 +94,21 @@ public class SchemaResolutionTest { // the type of ts_ltz is TIMESTAMP_LTZ private static final String COMPUTED_SQL_WITH_TS_LTZ = "ts_ltz - INTERVAL '60' MINUTE"; + private static final ResolvedExpression COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ = new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ(3), () -> COMPUTED_SQL_WITH_TS_LTZ); + private static final String WATERMARK_SQL_WITH_TS_LTZ = "ts1 - INTERVAL '5' SECOND"; + private static final ResolvedExpression WATERMARK_RESOLVED_WITH_TS_LTZ = new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ(3), () -> WATERMARK_SQL_WITH_TS_LTZ); + private static final Schema SCHEMA_WITH_TS_LTZ = Schema.newBuilder() - .primaryKeyNamed("primary_constraint", "id") // out of order .column("id", DataTypes.INT().notNull()) - .column("counter", DataTypes.INT().notNull()) - .column("payload", "ROW<name STRING, age INT, flag BOOLEAN>") - .columnByMetadata("topic", DataTypes.STRING(), true) - .columnByExpression( - "ts1", callSql(COMPUTED_SQL_WITH_TS_LTZ)) // out of order API expression + .columnByExpression("ts1", callSql(COMPUTED_SQL_WITH_TS_LTZ)) .columnByMetadata("ts_ltz", DataTypes.TIMESTAMP_LTZ(3), "timestamp") .watermark("ts1", WATERMARK_SQL_WITH_TS_LTZ) - .columnByExpression("proctime", PROCTIME_SQL) .build(); @Test @@ -152,39 +154,54 @@ public class SchemaResolutionTest { new ResolvedSchema( Arrays.asList( Column.physical("id", DataTypes.INT().notNull()), - Column.physical("counter", DataTypes.INT().notNull()), - Column.physical( - "payload", - DataTypes.ROW( - DataTypes.FIELD("name", DataTypes.STRING()), - DataTypes.FIELD("age", DataTypes.INT()), - DataTypes.FIELD("flag", DataTypes.BOOLEAN()))), - Column.metadata("topic", DataTypes.STRING(), null, true), Column.computed("ts1", COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ), Column.metadata( - "ts_ltz", DataTypes.TIMESTAMP_LTZ(3), "timestamp", false), - Column.computed("proctime", PROCTIME_RESOLVED)), + "ts_ltz", DataTypes.TIMESTAMP_LTZ(3), "timestamp", false)), Collections.singletonList( WatermarkSpec.of("ts1", WATERMARK_RESOLVED_WITH_TS_LTZ)), - UniqueConstraint.primaryKey( - "primary_constraint", Collections.singletonList("id"))); + null); final ResolvedSchema actualStreamSchema = resolveSchema(SCHEMA_WITH_TS_LTZ, true); { assertThat(actualStreamSchema, equalTo(expectedSchema)); assertTrue(isRowtimeAttribute(getType(actualStreamSchema, "ts1"))); - assertTrue(isProctimeAttribute(getType(actualStreamSchema, "proctime"))); } final ResolvedSchema actualBatchSchema = resolveSchema(SCHEMA_WITH_TS_LTZ, false); { assertThat(actualBatchSchema, equalTo(expectedSchema)); assertFalse(isRowtimeAttribute(getType(actualBatchSchema, "ts1"))); - assertTrue(isProctimeAttribute(getType(actualBatchSchema, "proctime"))); } } @Test + public void testSchemaResolutionWithSourceWatermark() { + final ResolvedSchema expectedSchema = + new ResolvedSchema( + Collections.singletonList( + Column.physical("ts_ltz", DataTypes.TIMESTAMP_LTZ(1))), + Collections.singletonList( + WatermarkSpec.of( + "ts_ltz", + new CallExpression( + FunctionIdentifier.of( + BuiltInFunctionDefinitions.SOURCE_WATERMARK + .getName()), + BuiltInFunctionDefinitions.SOURCE_WATERMARK, + Collections.emptyList(), + DataTypes.TIMESTAMP_LTZ(1)))), + null); + final ResolvedSchema resolvedSchema = + resolveSchema( + Schema.newBuilder() + .column("ts_ltz", DataTypes.TIMESTAMP_LTZ(1)) + .watermark("ts_ltz", sourceWatermark()) + .build()); + + assertThat(resolvedSchema, equalTo(expectedSchema)); + } + + @Test public void testSchemaResolutionErrors() { // columns @@ -282,20 +299,6 @@ public class SchemaResolutionTest { + " WATERMARK FOR `ts` AS [ts - INTERVAL '5' SECOND],\n" + " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n" + ")")); - assertThat( - SCHEMA_WITH_TS_LTZ.toString(), - equalTo( - "(\n" - + " `id` INT NOT NULL,\n" - + " `counter` INT NOT NULL,\n" - + " `payload` [ROW<name STRING, age INT, flag BOOLEAN>],\n" - + " `topic` METADATA VIRTUAL,\n" - + " `ts1` AS [ts_ltz - INTERVAL '60' MINUTE],\n" - + " `ts_ltz` METADATA FROM 'timestamp',\n" - + " `proctime` AS [PROCTIME()],\n" - + " WATERMARK FOR `ts1` AS [ts1 - INTERVAL '5' SECOND],\n" - + " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n" - + ")")); } @Test @@ -315,22 +318,6 @@ public class SchemaResolutionTest { + " WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '5' SECOND,\n" + " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n" + ")")); - - final ResolvedSchema resolvedSchemaWithTsLtz = resolveSchema(SCHEMA_WITH_TS_LTZ); - assertThat( - resolvedSchemaWithTsLtz.toString(), - equalTo( - "(\n" - + " `id` INT NOT NULL,\n" - + " `counter` INT NOT NULL,\n" - + " `payload` ROW<`name` STRING, `age` INT, `flag` BOOLEAN>,\n" - + " `topic` STRING METADATA VIRTUAL,\n" - + " `ts1` TIMESTAMP_LTZ(3) *ROWTIME* AS ts_ltz - INTERVAL '60' MINUTE,\n" - + " `ts_ltz` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',\n" - + " `proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME(),\n" - + " WATERMARK FOR `ts1`: TIMESTAMP_LTZ(3) AS ts1 - INTERVAL '5' SECOND,\n" - + " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n" - + ")")); } @Test diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala index 552ca9d..dd655b0 100644 --- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala +++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala @@ -20,10 +20,11 @@ package org.apache.flink.table.api import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark import org.apache.flink.table.expressions.ApiExpressionUtils.{unresolvedCall, unresolvedRef, valueLiteral} import org.apache.flink.table.expressions.{ApiExpressionUtils, Expression, TableSymbol, TimePointUnit} import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{DISTINCT, RANGE_TO} -import org.apache.flink.table.functions.{ScalarFunction, TableFunction, ImperativeAggregateFunction, UserDefinedFunctionHelper, _} +import org.apache.flink.table.functions.{ImperativeAggregateFunction, ScalarFunction, TableFunction, UserDefinedFunctionHelper, _} import org.apache.flink.table.types.DataType import org.apache.flink.types.Row @@ -464,12 +465,9 @@ trait ImplicitExpressionConversions { /** * Converts a numeric type epoch time to [[DataTypes#TIMESTAMP_LTZ]]. * - * <p>The supported precision is 0 or 3: - * - * <ul> - * <li>0 means the numericEpochTime is in second. - * <li>3 means the numericEpochTime is in millisecond. - * </ul> + * The supported precision is 0 or 3: + * - 0 means the numericEpochTime is in second. + * - 3 means the numericEpochTime is in millisecond. */ def toTimestampLtz(numericEpochTime: Expression, precision: Expression): Expression = { Expressions.toTimestampLtz(numericEpochTime, precision) @@ -682,6 +680,21 @@ trait ImplicitExpressionConversions { } /** + * Source watermark declaration for [[Schema]]. + * + * This is a marker function that doesn't have concrete runtime implementation. + * It can only be used as a single expression in [[Schema.Builder#watermark(String, Expression)]]. + * The declaration will be pushed down into a table source that implements the + * [[SupportsSourceWatermark]] interface. The source will emit system-defined watermarks + * afterwards. + * + * Please check the documentation whether the connector supports source watermarks. + */ + def sourceWatermark(): Expression = { + Expressions.sourceWatermark() + } + + /** * Ternary conditional operator that decides which of two other expressions should be evaluated * based on a evaluated boolean condition. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java index 1a045ae..6e36e31 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java @@ -216,63 +216,50 @@ public final class TypeInferenceUtil { * * @see CallContext#getOutputDataType() */ - public static final class SurroundingInfo { + public interface SurroundingInfo { - private final String name; - - private final FunctionDefinition functionDefinition; - - private final TypeInference typeInference; - - private final int argumentCount; - - private final int innerCallPosition; - - private final boolean isGroupedAggregation; - - public SurroundingInfo( + static SurroundingInfo of( String name, FunctionDefinition functionDefinition, TypeInference typeInference, int argumentCount, int innerCallPosition, boolean isGroupedAggregation) { - this.name = name; - this.functionDefinition = functionDefinition; - this.typeInference = typeInference; - this.argumentCount = argumentCount; - this.innerCallPosition = innerCallPosition; - this.isGroupedAggregation = isGroupedAggregation; + return typeFactory -> { + final boolean isValidCount = + validateArgumentCount( + typeInference.getInputTypeStrategy().getArgumentCount(), + argumentCount, + false); + if (!isValidCount) { + return Optional.empty(); + } + // for "takes_string(this_function(NULL))" simulate "takes_string(NULL)" + // for retrieving the output type of "this_function(NULL)" + final CallContext callContext = + new UnknownCallContext( + typeFactory, + name, + functionDefinition, + argumentCount, + isGroupedAggregation); + + // We might not be able to infer the input types at this moment, if the surrounding + // function does not provide an explicit input type strategy. + final CallContext adaptedContext = + adaptArguments(typeInference, callContext, null, false); + return typeInference + .getInputTypeStrategy() + .inferInputTypes(adaptedContext, false) + .map(dataTypes -> dataTypes.get(innerCallPosition)); + }; } - private Optional<DataType> inferOutputType(DataTypeFactory typeFactory) { - final boolean isValidCount = - validateArgumentCount( - typeInference.getInputTypeStrategy().getArgumentCount(), - argumentCount, - false); - if (!isValidCount) { - return Optional.empty(); - } - // for "takes_string(this_function(NULL))" simulate "takes_string(NULL)" - // for retrieving the output type of "this_function(NULL)" - final CallContext callContext = - new UnknownCallContext( - typeFactory, - name, - functionDefinition, - argumentCount, - isGroupedAggregation); - - // We might not be able to infer the input types at this moment, if the surrounding - // function does not provide an explicit input type strategy. - final CallContext adaptedContext = - adaptArguments(typeInference, callContext, null, false); - return typeInference - .getInputTypeStrategy() - .inferInputTypes(adaptedContext, false) - .map(dataTypes -> dataTypes.get(innerCallPosition)); + static SurroundingInfo of(DataType dataType) { + return typeFactory -> Optional.of(dataType); } + + Optional<DataType> inferOutputType(DataTypeFactory typeFactory); } /** diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java index 8e9f66d..52786ef 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java @@ -117,7 +117,7 @@ public abstract class InputTypeStrategiesTestBase { .outputTypeStrategy(TypeStrategies.MISSING) .build(); surroundingInfo = - new TypeInferenceUtil.SurroundingInfo( + TypeInferenceUtil.SurroundingInfo.of( "f_outer", functionDefinitionMock, outerTypeInference, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java index 08d9959..c5ea4ab 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java @@ -20,30 +20,29 @@ package org.apache.flink.table.planner.catalog; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogManager.TableLookupResult; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.factories.TableFactoryUtil; import org.apache.flink.table.factories.TableSourceFactory; import org.apache.flink.table.factories.TableSourceFactoryContextImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.plan.stats.FlinkStatistic; import org.apache.flink.table.planner.sources.TableSourceUtil; +import org.apache.flink.table.runtime.types.PlannerTypeUtils; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.sources.TableSourceValidation; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.TimestampType; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -52,9 +51,7 @@ import org.apache.calcite.schema.impl.AbstractTable; import java.util.List; import java.util.Optional; - -import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute; -import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isTimeAttribute; +import java.util.stream.Collectors; /** * Represents a wrapper for {@link CatalogBaseTable} in {@link org.apache.calcite.schema.Schema}. @@ -124,34 +121,7 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable { @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory; - TableSchema tableSchema = TableSchema.fromResolvedSchema(lookupResult.getResolvedSchema()); - final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); - CatalogBaseTable catalogTable = lookupResult.getTable(); - if (!isStreamingMode - && catalogTable instanceof ConnectorCatalogTable - && ((ConnectorCatalogTable<?, ?>) catalogTable).getTableSource().isPresent()) { - // If the table source is bounded, materialize the time attributes to normal TIMESTAMP - // type. - // Now for ConnectorCatalogTable, there is no way to - // deduce if it is bounded in the table environment, so the data types in TableSchema - // always patched with TimeAttribute. - // See ConnectorCatalogTable#calculateSourceSchema - // for details. - - // Remove the patched time attributes type to let the TableSourceTable handle it. - // We should remove this logic if the isBatch flag in ConnectorCatalogTable is fixed. - // TODO: Fix FLINK-14844. - for (int i = 0; i < fieldDataTypes.length; i++) { - LogicalType lt = fieldDataTypes[i].getLogicalType(); - if (lt instanceof TimestampType && isRowtimeAttribute(lt)) { - int precision = ((TimestampType) lt).getPrecision(); - fieldDataTypes[i] = DataTypes.TIMESTAMP(precision); - } else if (lt instanceof LocalZonedTimestampType && isTimeAttribute(lt)) { - int precision = ((LocalZonedTimestampType) lt).getPrecision(); - fieldDataTypes[i] = DataTypes.TIMESTAMP_LTZ(precision); - } - } - } + final ResolvedSchema schema = lookupResult.getResolvedSchema(); // The following block is a workaround to support tables defined by // TableEnvironment.connect() and @@ -159,9 +129,10 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable { // It should be removed after we remove DefinedProctimeAttribute/DefinedRowtimeAttributes. Optional<TableSource<?>> sourceOpt = findAndCreateTableSource(); if (isStreamingMode - && tableSchema.getTableColumns().stream().allMatch(TableColumn::isPhysical) - && tableSchema.getWatermarkSpecs().isEmpty() + && schema.getColumns().stream().allMatch(Column::isPhysical) + && schema.getWatermarkSpecs().isEmpty() && sourceOpt.isPresent()) { + TableSchema tableSchema = TableSchema.fromResolvedSchema(schema); TableSource<?> source = sourceOpt.get(); if (TableSourceValidation.hasProctimeAttribute(source) || TableSourceValidation.hasRowtimeAttribute(source)) { @@ -171,10 +142,17 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable { // ConnectorCatalogTable#calculateSourceSchema tableSchema = ConnectorCatalogTable.calculateSourceSchema(source, false); } + return TableSourceUtil.getSourceRowType( + flinkTypeFactory, tableSchema, scala.Option.empty(), true); } - return TableSourceUtil.getSourceRowType( - flinkTypeFactory, tableSchema, scala.Option.empty(), isStreamingMode); + final List<String> fieldNames = schema.getColumnNames(); + final List<LogicalType> fieldTypes = + schema.getColumnDataTypes().stream() + .map(DataType::getLogicalType) + .map(PlannerTypeUtils::removeLegacyTypes) + .collect(Collectors.toList()); + return flinkTypeFactory.buildRelNodeRowType(fieldNames, fieldTypes); } @Override diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index b732265..6222b23 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -20,13 +20,12 @@ package org.apache.flink.table.planner.connectors; import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.TableColumn.MetadataColumn; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.Column.MetadataColumn; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; @@ -184,7 +183,7 @@ public final class DynamicSinkUtils { final DataTypeFactory dataTypeFactory = unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory(); final FlinkTypeFactory typeFactory = unwrapTypeFactory(relBuilder); - final TableSchema schema = table.getSchema(); + final ResolvedSchema schema = table.getResolvedSchema(); List<SinkAbilitySpec> sinkAbilitySpecs = new ArrayList<>(); @@ -230,7 +229,7 @@ public final class DynamicSinkUtils { */ public static RelNode validateSchemaAndApplyImplicitCast( RelNode query, - TableSchema sinkSchema, + ResolvedSchema sinkSchema, @Nullable ObjectIdentifier sinkIdentifier, DataTypeFactory dataTypeFactory, FlinkTypeFactory typeFactory) { @@ -239,7 +238,7 @@ public final class DynamicSinkUtils { final RowType sinkType = (RowType) - fixSinkDataType(dataTypeFactory, sinkSchema.toPersistedRowDataType()) + fixSinkDataType(dataTypeFactory, sinkSchema.toSinkRowDataType()) .getLogicalType(); final List<RowField> sinkFields = sinkType.getFields(); @@ -284,10 +283,10 @@ public final class DynamicSinkUtils { private static void pushMetadataProjection( FlinkRelBuilder relBuilder, FlinkTypeFactory typeFactory, - TableSchema schema, + ResolvedSchema schema, DynamicTableSink sink) { final RexBuilder rexBuilder = relBuilder.getRexBuilder(); - final List<TableColumn> tableColumns = schema.getTableColumns(); + final List<Column> columns = schema.getColumns(); final List<Integer> physicalColumns = extractPhysicalColumns(schema); @@ -297,9 +296,9 @@ public final class DynamicSinkUtils { Collectors.toMap( pos -> { final MetadataColumn metadataColumn = - (MetadataColumn) tableColumns.get(pos); + (MetadataColumn) columns.get(pos); return metadataColumn - .getMetadataAlias() + .getMetadataKey() .orElse(metadataColumn.getName()); }, Function.identity())); @@ -311,13 +310,11 @@ public final class DynamicSinkUtils { final List<String> fieldNames = Stream.concat( - physicalColumns.stream() - .map(tableColumns::get) - .map(TableColumn::getName), + physicalColumns.stream().map(columns::get).map(Column::getName), metadataColumns.stream() - .map(tableColumns::get) + .map(columns::get) .map(MetadataColumn.class::cast) - .map(c -> c.getMetadataAlias().orElse(c.getName()))) + .map(c -> c.getMetadataKey().orElse(c.getName()))) .collect(Collectors.toList()); final Map<String, DataType> metadataMap = extractMetadataMap(sink); @@ -328,18 +325,17 @@ public final class DynamicSinkUtils { .map( pos -> { final int posAdjusted = - adjustByVirtualColumns( - tableColumns, pos); + adjustByVirtualColumns(columns, pos); return relBuilder.field(posAdjusted); }), metadataColumns.stream() .map( pos -> { final MetadataColumn metadataColumn = - (MetadataColumn) tableColumns.get(pos); + (MetadataColumn) columns.get(pos); final String metadataKey = metadataColumn - .getMetadataAlias() + .getMetadataKey() .orElse( metadataColumn .getName()); @@ -354,8 +350,7 @@ public final class DynamicSinkUtils { expectedType); final int posAdjusted = - adjustByVirtualColumns( - tableColumns, pos); + adjustByVirtualColumns(columns, pos); return rexBuilder.makeAbstractCast( expectedRelDataType, relBuilder.field(posAdjusted)); @@ -374,13 +369,13 @@ public final class DynamicSinkUtils { Map<String, String> staticPartitions, boolean isOverwrite, DynamicTableSink sink, - CatalogTable table, + ResolvedCatalogTable table, List<SinkAbilitySpec> sinkAbilitySpecs) { validatePartitioning(sinkIdentifier, staticPartitions, sink, table.getPartitionKeys()); validateAndApplyOverwrite(sinkIdentifier, isOverwrite, sink, sinkAbilitySpecs); - validateAndApplyMetadata(sinkIdentifier, sink, table.getSchema(), sinkAbilitySpecs); + validateAndApplyMetadata(sinkIdentifier, sink, table.getResolvedSchema(), sinkAbilitySpecs); } /** @@ -391,15 +386,15 @@ public final class DynamicSinkUtils { * #prepareDynamicSink}. */ private static List<String> createRequiredMetadataKeys( - TableSchema schema, DynamicTableSink sink) { - final List<TableColumn> tableColumns = schema.getTableColumns(); + ResolvedSchema schema, DynamicTableSink sink) { + final List<Column> tableColumns = schema.getColumns(); final List<Integer> metadataColumns = extractPersistedMetadataColumns(schema); final Set<String> requiredMetadataKeys = metadataColumns.stream() .map(tableColumns::get) .map(MetadataColumn.class::cast) - .map(c -> c.getMetadataAlias().orElse(c.getName())) + .map(c -> c.getMetadataKey().orElse(c.getName())) .collect(Collectors.toSet()); final Map<String, DataType> metadataMap = extractMetadataMap(sink); @@ -506,33 +501,29 @@ public final class DynamicSinkUtils { sinkAbilitySpecs.add(new OverwriteSpec(true)); } - private static List<Integer> extractPhysicalColumns(TableSchema schema) { - final List<TableColumn> tableColumns = schema.getTableColumns(); - return IntStream.range(0, schema.getFieldCount()) - .filter(pos -> tableColumns.get(pos).isPhysical()) + private static List<Integer> extractPhysicalColumns(ResolvedSchema schema) { + final List<Column> columns = schema.getColumns(); + return IntStream.range(0, schema.getColumnCount()) + .filter(pos -> columns.get(pos).isPhysical()) .boxed() .collect(Collectors.toList()); } - private static List<Integer> extractPersistedMetadataColumns(TableSchema schema) { - final List<TableColumn> tableColumns = schema.getTableColumns(); - return IntStream.range(0, schema.getFieldCount()) + private static List<Integer> extractPersistedMetadataColumns(ResolvedSchema schema) { + final List<Column> columns = schema.getColumns(); + return IntStream.range(0, schema.getColumnCount()) .filter( pos -> { - final TableColumn tableColumn = tableColumns.get(pos); - return tableColumn instanceof MetadataColumn - && tableColumn.isPersisted(); + final Column column = columns.get(pos); + return column instanceof MetadataColumn && column.isPersisted(); }) .boxed() .collect(Collectors.toList()); } - private static int adjustByVirtualColumns(List<TableColumn> tableColumns, int pos) { + private static int adjustByVirtualColumns(List<Column> columns, int pos) { return pos - - (int) - IntStream.range(0, pos) - .filter(i -> !tableColumns.get(i).isPersisted()) - .count(); + - (int) IntStream.range(0, pos).filter(i -> !columns.get(i).isPersisted()).count(); } private static Map<String, DataType> extractMetadataMap(DynamicTableSink sink) { @@ -545,9 +536,9 @@ public final class DynamicSinkUtils { private static void validateAndApplyMetadata( ObjectIdentifier sinkIdentifier, DynamicTableSink sink, - TableSchema schema, + ResolvedSchema schema, List<SinkAbilitySpec> sinkAbilitySpecs) { - final List<TableColumn> tableColumns = schema.getTableColumns(); + final List<Column> columns = schema.getColumns(); final List<Integer> metadataColumns = extractPersistedMetadataColumns(schema); if (metadataColumns.isEmpty()) { @@ -569,10 +560,10 @@ public final class DynamicSinkUtils { ((SupportsWritingMetadata) sink).listWritableMetadata(); metadataColumns.forEach( pos -> { - final MetadataColumn metadataColumn = (MetadataColumn) tableColumns.get(pos); + final MetadataColumn metadataColumn = (MetadataColumn) columns.get(pos); final String metadataKey = - metadataColumn.getMetadataAlias().orElse(metadataColumn.getName()); - final LogicalType metadataType = metadataColumn.getType().getLogicalType(); + metadataColumn.getMetadataKey().orElse(metadataColumn.getName()); + final LogicalType metadataType = metadataColumn.getDataType().getLogicalType(); final DataType expectedMetadataDataType = metadataMap.get(metadataKey); // check that metadata key is valid if (expectedMetadataDataType == null) { @@ -626,13 +617,13 @@ public final class DynamicSinkUtils { * * <p>The format looks as follows: {@code PHYSICAL COLUMNS + PERSISTED METADATA COLUMNS} */ - private static RowType createConsumedType(TableSchema schema, DynamicTableSink sink) { + private static RowType createConsumedType(ResolvedSchema schema, DynamicTableSink sink) { final Map<String, DataType> metadataMap = extractMetadataMap(sink); final Stream<RowField> physicalFields = - schema.getTableColumns().stream() - .filter(TableColumn::isPhysical) - .map(c -> new RowField(c.getName(), c.getType().getLogicalType())); + schema.getColumns().stream() + .filter(Column::isPhysical) + .map(c -> new RowField(c.getName(), c.getDataType().getLogicalType())); final Stream<RowField> metadataFields = createRequiredMetadataKeys(schema, sink).stream() diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java index 800eca6..4818887 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java @@ -27,6 +27,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.Column.ComputedColumn; import org.apache.flink.table.catalog.Column.MetadataColumn; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; @@ -239,7 +240,7 @@ public final class DynamicSourceUtils { boolean changeEventsDuplicate = config.getConfiguration() .getBoolean(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE); - boolean hasPrimaryKey = catalogTable.getSchema().getPrimaryKey().isPresent(); + boolean hasPrimaryKey = catalogTable.getResolvedSchema().getPrimaryKey().isPresent(); return isCDCSource && changeEventsDuplicate && hasPrimaryKey; } @@ -268,9 +269,8 @@ public final class DynamicSourceUtils { schema.getColumns().stream() .map( c -> { - if (c instanceof Column.ComputedColumn) { - final Column.ComputedColumn computedColumn = - (Column.ComputedColumn) c; + if (c instanceof ComputedColumn) { + final ComputedColumn computedColumn = (ComputedColumn) c; return computedColumn.getExpression().accept(converter); } else { return relBuilder.field(c.getName()); @@ -296,13 +296,13 @@ public final class DynamicSourceUtils { final List<String> fieldNames = schema.getColumns().stream() - .filter(c -> !(c instanceof Column.ComputedColumn)) + .filter(c -> !(c instanceof ComputedColumn)) .map(Column::getName) .collect(Collectors.toList()); final List<RexNode> fieldNodes = schema.getColumns().stream() - .filter(c -> !(c instanceof Column.ComputedColumn)) + .filter(c -> !(c instanceof ComputedColumn)) .map( c -> { final RelDataType relDataType = diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java index 81a8c49..c95adfd 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java @@ -41,6 +41,7 @@ import org.apache.flink.table.types.logical.TimeType; import org.apache.calcite.avatica.util.ByteString; import org.apache.calcite.avatica.util.TimeUnit; import org.apache.calcite.avatica.util.TimeUnitRange; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; @@ -212,7 +213,19 @@ public class ExpressionConverter implements ExpressionVisitor<RexNode> { if (other instanceof RexNodeExpression) { return ((RexNodeExpression) other).getRexNode(); } else if (other instanceof LocalReferenceExpression) { - LocalReferenceExpression local = (LocalReferenceExpression) other; + final LocalReferenceExpression local = (LocalReferenceExpression) other; + // check whether the local field reference can actually be resolved to an existing + // field otherwise preserve the locality attribute + RelNode inputNode; + try { + inputNode = relBuilder.peek(); + } catch (Throwable t) { + inputNode = null; + } + if (inputNode != null + && inputNode.getRowType().getFieldNames().contains(local.getName())) { + return relBuilder.field(local.getName()); + } return new RexFieldVariable( local.getName(), typeFactory.createFieldTypeFromLogicalType( diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index cb700b6..90a0960 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -35,8 +35,9 @@ import org.apache.flink.streaming.api.transformations.SinkTransformation; import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; @@ -52,7 +53,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator; import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; -import org.apache.flink.table.planner.sinks.TableSinkUtils; import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer; @@ -60,16 +60,18 @@ import org.apache.flink.table.runtime.operators.sink.SinkOperator; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.RowKind; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.flink.util.Preconditions.checkArgument; @@ -114,8 +116,9 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> final DynamicTableSink tableSink = tableSinkSpec.getTableSink(); final DynamicTableSink.SinkRuntimeProvider runtimeProvider = tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)); - final TableSchema tableSchema = tableSinkSpec.getCatalogTable().getSchema(); - inputTransform = applyNotNullEnforcer(tableConfig, tableSchema, inputTransform); + final ResolvedSchema schema = tableSinkSpec.getCatalogTable().getResolvedSchema(); + final RowType consumedRowType = getConsumedRowType(schema); + inputTransform = applyNotNullEnforcer(tableConfig, consumedRowType, inputTransform); if (runtimeProvider instanceof DataStreamSinkProvider) { if (runtimeProvider instanceof ParallelismProvider) { @@ -147,7 +150,11 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> // apply keyBy partition transformation if needed inputTransform = applyKeyByForDifferentParallelism( - tableSchema, inputTransform, inputParallelism, sinkParallelism); + consumedRowType, + schema.getPrimaryKey().orElse(null), + inputTransform, + inputParallelism, + sinkParallelism); final SinkFunction<RowData> sinkFunction; if (runtimeProvider instanceof SinkFunctionProvider) { @@ -179,15 +186,12 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> * Apply an operator to filter or report error to process not-null values for not-null fields. */ private Transformation<RowData> applyNotNullEnforcer( - TableConfig config, TableSchema tableSchema, Transformation<RowData> inputTransform) { + TableConfig config, RowType consumedRowType, Transformation<RowData> inputTransform) { final ExecutionConfigOptions.NotNullEnforcer notNullEnforcer = config.getConfiguration() .get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER); - final int[] notNullFieldIndices = TableSinkUtils.getNotNullFieldIndices(tableSchema); - final String[] fieldNames = - ((RowType) tableSchema.toPhysicalRowDataType().getLogicalType()) - .getFieldNames() - .toArray(new String[0]); + final int[] notNullFieldIndices = getNotNullFieldIndices(consumedRowType); + final String[] fieldNames = consumedRowType.getFieldNames().toArray(new String[0]); if (notNullFieldIndices.length > 0) { final SinkNotNullEnforcer enforcer = @@ -211,6 +215,12 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> } } + private int[] getNotNullFieldIndices(RowType consumedType) { + return IntStream.range(0, consumedType.getFieldCount()) + .filter(pos -> !consumedType.getTypeAt(pos).isNullable()) + .toArray(); + } + /** * Returns the parallelism of sink operator, it assumes the sink runtime provider implements * {@link ParallelismProvider}. It returns parallelism defined in {@link ParallelismProvider} if @@ -242,11 +252,12 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> * ordering of changelog messages. */ private Transformation<RowData> applyKeyByForDifferentParallelism( - TableSchema tableSchema, + RowType sinkRowType, + @Nullable UniqueConstraint primaryKey, Transformation<RowData> inputTransform, int inputParallelism, int sinkParallelism) { - final int[] primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(tableSchema); + final int[] primaryKeys = getPrimaryKeyIndices(sinkRowType, primaryKey); if (inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT)) { // if the inputParallelism is equals to the parallelism or insert-only mode, do nothing. return inputTransform; @@ -276,6 +287,13 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> } } + private int[] getPrimaryKeyIndices(RowType sinkRowType, @Nullable UniqueConstraint primaryKey) { + if (primaryKey == null) { + return new int[0]; + } + return primaryKey.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray(); + } + private Transformation<Object> createSinkFunctionTransformation( SinkFunction<RowData> sinkFunction, StreamExecutionEnvironment env, @@ -299,4 +317,8 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> private InternalTypeInfo<RowData> getInputTypeInfo() { return InternalTypeInfo.of(getInputEdges().get(0).getOutputType()); } + + private RowType getConsumedRowType(ResolvedSchema schema) { + return (RowType) schema.toSinkRowDataType().getLogicalType(); + } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java index f9d526b..cf52dd8 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.utils; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.planner.calcite.FlinkContext; @@ -88,6 +89,13 @@ public final class ShortcutUtils { public static @Nullable FunctionDefinition unwrapFunctionDefinition( ResolvedExpression expression) { + // Table API expression + if (expression instanceof CallExpression) { + final CallExpression callExpression = (CallExpression) expression; + return callExpression.getFunctionDefinition(); + } + + // SQL expression if (!(expression instanceof RexNodeExpression)) { return null; } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala index 24a5b50..18ce788 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala @@ -225,6 +225,22 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) } /** + * Creates a table row type with the given field names and field types. Table row type is table + * schema for Calcite [[RelNode]]. See [[RelNode#getRowType]]. + * + * It uses [[StructKind#FULLY_QUALIFIED]] to let each field must be referenced explicitly. + * + * @param fieldNames field names + * @param fieldTypes field types, every element is Flink's [[LogicalType]] + * @return a table row type with the input fieldNames, input fieldTypes. + */ + def buildRelNodeRowType( + fieldNames: util.List[String], + fieldTypes: util.List[LogicalType]): RelDataType = { + buildStructType(fieldNames, fieldTypes, StructKind.FULLY_QUALIFIED) + } + + /** * Creates a table row type with the input fieldNames and input fieldTypes using * FlinkTypeFactory. Table row type is table schema for Calcite RelNode. See getRowType of * [[RelNode]]. Use FULLY_QUALIFIED to let each field must be referenced explicitly. diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index e676771..c16ab9e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -51,20 +51,18 @@ import org.apache.flink.table.planner.utils.InternalConfigOptions.{TABLE_QUERY_S import org.apache.flink.table.planner.utils.JavaScalaConversionUtil import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter -import org.apache.flink.table.utils.TableSchemaUtils import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema import org.apache.calcite.plan.{RelTrait, RelTraitDef} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.tools.FrameworkConfig import java.lang.{Long => JLong} import java.util import java.util.TimeZone -import org.apache.calcite.rel.hint.RelHint - import _root_.scala.collection.JavaConversions._ /** @@ -182,7 +180,7 @@ abstract class PlannerBase( // validate query schema and sink schema, and apply cast if possible val query = validateSchemaAndApplyImplicitCast( input, - sinkSchema, + catalogManager.getSchemaResolver.resolve(sinkSchema.toSchema), null, dataTypeFactory, getTypeFactory) @@ -211,7 +209,7 @@ abstract class PlannerBase( // validate query schema and sink schema, and apply cast if possible val query = validateSchemaAndApplyImplicitCast( input, - TableSchemaUtils.getPhysicalSchema(table.getSchema), + table.getResolvedSchema, catalogSink.getTableIdentifier, dataTypeFactory, getTypeFactory) @@ -256,7 +254,7 @@ abstract class PlannerBase( // validate query schema and sink schema, and apply cast if possible val query = validateSchemaAndApplyImplicitCast( input, - sinkPhysicalSchema, + catalogManager.getSchemaResolver.resolve(sinkPhysicalSchema.toSchema), null, dataTypeFactory, getTypeFactory) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala index c0e8a76..ae3828a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala @@ -252,7 +252,7 @@ class LegacyCatalogSourceTable[T]( f.getType } } - factory.buildRelNodeRowType(fieldNames, fieldTypes) + factory.buildRelNodeRowType(fieldNames.asScala, fieldTypes) } } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala index 87757e8..3ea44f4 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala @@ -269,15 +269,4 @@ object TableSinkUtils { false) } } - - /** - * Gets the NOT NULL physical field indices on the [[CatalogTable]]. - */ - def getNotNullFieldIndices(tableSchema: TableSchema): Array[Int] = { - val rowType = tableSchema.toPhysicalRowDataType.getLogicalType.asInstanceOf[RowType] - val fieldTypes = rowType.getFields.map(_.getType).toArray - fieldTypes.indices.filter { index => - !fieldTypes(index).isNullable - }.toArray - } } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java index 2f0566a..386c43e 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java @@ -68,6 +68,8 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.api.Expressions.sourceWatermark; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; @@ -252,6 +254,7 @@ public class DataStreamJavaITCase extends AbstractTestBase { dataStream, Schema.newBuilder() .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") + // uses SQL expressions .watermark("rowtime", "SOURCE_WATERMARK()") .build()); @@ -313,12 +316,14 @@ public class DataStreamJavaITCase extends AbstractTestBase { changelogStream, Schema.newBuilder() .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)) - .watermark("rowtime", "SOURCE_WATERMARK()") + // uses Table API expressions + .columnByExpression("computed", $("f1").upperCase()) + .watermark("rowtime", sourceWatermark()) .build()); tableEnv.createTemporaryView("t", table); // access and reorder columns - final Table reordered = tableEnv.sqlQuery("SELECT f1, rowtime, f0 FROM t"); + final Table reordered = tableEnv.sqlQuery("SELECT computed, rowtime, f0 FROM t"); // write out the rowtime column with fully declared schema final DataStream<Row> result = @@ -327,6 +332,8 @@ public class DataStreamJavaITCase extends AbstractTestBase { Schema.newBuilder() .column("f1", DataTypes.STRING()) .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)) + // uses Table API expressions + .columnByExpression("ignored", $("f1").upperCase()) .column("f0", DataTypes.INT()) .build()); @@ -343,9 +350,9 @@ public class DataStreamJavaITCase extends AbstractTestBase { out.collect(Row.of(key, sum)); }) .returns(Types.ROW(Types.STRING, Types.INT)), - Row.of("a", 47), - Row.of("c", 1000), - Row.of("c", 1000)); + Row.of("A", 47), + Row.of("C", 1000), + Row.of("C", 1000)); } @Test