This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new cc6258c [FLINK-18378] Improve CatalogTable schema resolution cc6258c is described below commit cc6258c2876f76b68d9516508325af03a3e39c3e Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Fri Jun 19 15:53:08 2020 +0200 [FLINK-18378] Improve CatalogTable schema resolution This closes #12725 --- .../api/internal/CatalogTableSchemaResolver.java | 35 ++-- .../table/api/internal/TableEnvironmentImpl.java | 7 +- .../apache/flink/table/catalog/CatalogManager.java | 42 ++-- .../flink/table/catalog/CatalogTableImpl.java | 8 - .../apache/flink/table/catalog/CatalogTest.java | 2 +- .../table/planner/catalog/CatalogSchemaTable.java | 61 +++--- .../planner/catalog/DatabaseCalciteSchema.java | 5 +- .../planner/catalog/JavaCatalogTableTest.java | 211 +++++++++++++++++++++ .../plan/FlinkCalciteCatalogReaderTest.java | 9 +- .../table/planner/catalog/JavaCatalogTableTest.xml | 153 +++++++++++++++ .../flink/table/catalog/DatabaseCalciteSchema.java | 56 ++++-- .../catalog/QueryOperationCatalogViewTable.java | 9 +- .../flink/table/api/internal/TableEnvImpl.scala | 2 +- 13 files changed, 490 insertions(+), 110 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java index 3e1e08a..e4d011f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java @@ -27,8 +27,10 @@ import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.flink.table.types.logical.TimestampKind; import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.table.types.utils.TypeConversions; /** @@ -76,17 +78,17 @@ public class CatalogTableSchemaResolver { for (int i = 0; i < tableSchema.getFieldCount(); ++i) { TableColumn tableColumn = tableSchema.getTableColumns().get(i); DataType fieldType = fieldTypes[i]; - if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) { - if (fieldNames[i].equals(rowtime)) { - throw new TableException("Watermark can not be defined for a processing time attribute column."); + + if (tableColumn.isGenerated()) { + fieldType = resolveExpressionDataType(tableColumn.getExpr().get(), tableSchema); + if (isProctime(fieldType)) { + if (fieldNames[i].equals(rowtime)) { + throw new TableException("Watermark can not be defined for a processing time attribute column."); + } } - TimestampType originalType = (TimestampType) fieldType.getLogicalType(); - LogicalType proctimeType = new TimestampType( - originalType.isNullable(), - TimestampKind.PROCTIME, - originalType.getPrecision()); - fieldType = TypeConversions.fromLogicalToDataType(proctimeType); - } else if (isStreamingMode && fieldNames[i].equals(rowtime)) { + } + + if (isStreamingMode && fieldNames[i].equals(rowtime)) { TimestampType originalType = (TimestampType) fieldType.getLogicalType(); LogicalType rowtimeType = new TimestampType( originalType.isNullable(), @@ -94,6 +96,7 @@ public class CatalogTableSchemaResolver { originalType.getPrecision()); fieldType = TypeConversions.fromLogicalToDataType(rowtimeType); } + if (tableColumn.isGenerated()) { builder.field(fieldNames[i], fieldType, tableColumn.getExpr().get()); } else { @@ -107,12 +110,16 @@ public class CatalogTableSchemaResolver { return builder.build(); } - private boolean isProctimeType(String expr, TableSchema tableSchema) { + private boolean isProctime(DataType exprType) { + return LogicalTypeChecks.hasFamily(exprType.getLogicalType(), LogicalTypeFamily.TIMESTAMP) && + LogicalTypeChecks.isProctimeAttribute(exprType.getLogicalType()); + } + + private DataType resolveExpressionDataType(String expr, TableSchema tableSchema) { ResolvedExpression resolvedExpr = parser.parseSqlExpression(expr, tableSchema); if (resolvedExpr == null) { - return false; + throw new ValidationException("Could not resolve field expression: " + expr); } - LogicalType type = resolvedExpr.getOutputDataType().getLogicalType(); - return type instanceof TimestampType && ((TimestampType) type).getKind() == TimestampKind.PROCTIME; + return resolvedExpr.getOutputDataType(); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index a95a3f4..95deb43 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -519,9 +519,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) { ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier); - return catalogManager.getTable(tableIdentifier).map(t -> { - return new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()); - }); + return catalogManager.getTable(tableIdentifier) + .map(t -> new CatalogQueryOperation(tableIdentifier, t.getResolvedSchema())); } @Override @@ -1062,7 +1061,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { Optional<CatalogManager.TableLookupResult> result = catalogManager.getTable(describeTableOperation.getSqlIdentifier()); if (result.isPresent()) { - return buildDescribeResult(result.get().getTable().getSchema()); + return buildDescribeResult(result.get().getResolvedSchema()); } else { throw new ValidationException(String.format( "Tables or views with the identifier '%s' doesn't exist", diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 9c1e1d3..f80efd1 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -19,6 +19,7 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.CatalogNotExistException; @@ -323,18 +324,25 @@ public final class CatalogManager { public static class TableLookupResult { private final boolean isTemporary; private final CatalogBaseTable table; + private final TableSchema resolvedSchema; - private static TableLookupResult temporary(CatalogBaseTable table) { - return new TableLookupResult(true, table); + @VisibleForTesting + public static TableLookupResult temporary(CatalogBaseTable table, TableSchema resolvedSchema) { + return new TableLookupResult(true, table, resolvedSchema); } - private static TableLookupResult permanent(CatalogBaseTable table) { - return new TableLookupResult(false, table); + @VisibleForTesting + public static TableLookupResult permanent(CatalogBaseTable table, TableSchema resolvedSchema) { + return new TableLookupResult(false, table, resolvedSchema); } - private TableLookupResult(boolean isTemporary, CatalogBaseTable table) { + private TableLookupResult( + boolean isTemporary, + CatalogBaseTable table, + TableSchema resolvedSchema) { this.isTemporary = isTemporary; this.table = table; + this.resolvedSchema = resolvedSchema; } public boolean isTemporary() { @@ -344,6 +352,10 @@ public final class CatalogManager { public CatalogBaseTable getTable() { return table; } + + public TableSchema getResolvedSchema() { + return resolvedSchema; + } } /** @@ -357,21 +369,15 @@ public final class CatalogManager { Preconditions.checkNotNull(schemaResolver, "schemaResolver should not be null"); CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier); if (temporaryTable != null) { - return Optional.of(TableLookupResult.temporary(resolveTableSchema(temporaryTable))); + TableSchema resolvedSchema = resolveTableSchema(temporaryTable); + return Optional.of(TableLookupResult.temporary(temporaryTable, resolvedSchema)); } else { - Optional<TableLookupResult> result = getPermanentTable(objectIdentifier); - return result.map(tableLookupResult -> - TableLookupResult.permanent(resolveTableSchema(tableLookupResult.getTable()))); + return getPermanentTable(objectIdentifier); } } - private CatalogBaseTable resolveTableSchema(CatalogBaseTable table) { - if (!(table instanceof CatalogTableImpl)) { - return table; - } - CatalogTableImpl catalogTableImpl = (CatalogTableImpl) table; - TableSchema newTableSchema = schemaResolver.resolve(catalogTableImpl.getSchema()); - return catalogTableImpl.copy(newTableSchema); + private TableSchema resolveTableSchema(CatalogBaseTable table) { + return schemaResolver.resolve(table.getSchema()); } /** @@ -398,7 +404,9 @@ public final class CatalogManager { ObjectPath objectPath = objectIdentifier.toObjectPath(); if (currentCatalog != null) { try { - return Optional.of(TableLookupResult.permanent(currentCatalog.getTable(objectPath))); + CatalogBaseTable catalogTable = currentCatalog.getTable(objectPath); + TableSchema resolvedSchema = resolveTableSchema(catalogTable); + return Optional.of(TableLookupResult.permanent(catalogTable, resolvedSchema)); } catch (TableNotExistException e) { // Ignore. } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java index dfe5d8b..9ac9c5c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java @@ -88,14 +88,6 @@ public class CatalogTableImpl extends AbstractCatalogTable { return new CatalogTableImpl(getSchema(), getPartitionKeys(), options, getComment()); } - public CatalogTable copy(TableSchema tableSchema) { - return new CatalogTableImpl( - tableSchema.copy(), - new ArrayList<>(getPartitionKeys()), - new HashMap<>(getProperties()), - getComment()); - } - /** * Construct a {@link CatalogTableImpl} from complete properties that contains table schema. */ diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java index 2ad212b..ca7bda8 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java @@ -1335,7 +1335,7 @@ public abstract class CatalogTest { @Override public TableSchema getSchema() { - return null; + return TableSchema.builder().build(); } @Override diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java index 23a8d80..ebf53f4 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java @@ -18,16 +18,16 @@ package org.apache.flink.table.planner.catalog; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogManager.TableLookupResult; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.factories.TableFactoryUtil; @@ -67,10 +67,9 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable { //~ Instance fields -------------------------------------------------------- private final ObjectIdentifier tableIdentifier; - private final CatalogBaseTable catalogBaseTable; + private final TableLookupResult lookupResult; private final FlinkStatistic statistic; private final boolean isStreamingMode; - private final boolean isTemporary; private final Catalog catalog; //~ Constructors ----------------------------------------------------------- @@ -79,25 +78,22 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable { * Create a CatalogSchemaTable instance. * * @param tableIdentifier Table identifier - * @param catalogBaseTable CatalogBaseTable instance which exists in the catalog + * @param lookupResult A result of catalog lookup * @param statistic Table statistics * @param catalog The catalog which the schema table belongs to * @param isStreaming If the table is for streaming mode - * @param isTemporary If the table is temporary */ public CatalogSchemaTable( ObjectIdentifier tableIdentifier, - CatalogBaseTable catalogBaseTable, + TableLookupResult lookupResult, FlinkStatistic statistic, Catalog catalog, - boolean isStreaming, - boolean isTemporary) { + boolean isStreaming) { this.tableIdentifier = tableIdentifier; - this.catalogBaseTable = catalogBaseTable; + this.lookupResult = lookupResult; this.statistic = statistic; this.catalog = catalog; this.isStreamingMode = isStreaming; - this.isTemporary = isTemporary; } //~ Methods ---------------------------------------------------------------- @@ -111,11 +107,11 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable { } public CatalogBaseTable getCatalogTable() { - return catalogBaseTable; + return lookupResult.getTable(); } public boolean isTemporary() { - return isTemporary; + return lookupResult.isTemporary(); } public boolean isStreamingMode() { @@ -124,23 +120,13 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable { @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return getRowType(typeFactory, catalogBaseTable, isStreamingMode); - } - - @Override - public FlinkStatistic getStatistic() { - return statistic; - } - - private RelDataType getRowType(RelDataTypeFactory typeFactory, - CatalogBaseTable catalogBaseTable, - boolean isStreamingMode) { final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory; - TableSchema tableSchema = catalogBaseTable.getSchema(); + TableSchema tableSchema = lookupResult.getResolvedSchema(); final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); + CatalogBaseTable catalogTable = lookupResult.getTable(); if (!isStreamingMode - && catalogBaseTable instanceof ConnectorCatalogTable - && ((ConnectorCatalogTable) catalogBaseTable).getTableSource().isPresent()) { + && catalogTable instanceof ConnectorCatalogTable + && ((ConnectorCatalogTable<?, ?>) catalogTable).getTableSource().isPresent()) { // If the table source is bounded, materialize the time attributes to normal TIMESTAMP type. // Now for ConnectorCatalogTable, there is no way to // deduce if it is bounded in the table environment, so the data types in TableSchema @@ -180,10 +166,15 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable { } return TableSourceUtil.getSourceRowType( - flinkTypeFactory, - tableSchema, - scala.Option.empty(), - isStreamingMode); + flinkTypeFactory, + tableSchema, + scala.Option.empty(), + isStreamingMode); + } + + @Override + public FlinkStatistic getStatistic() { + return statistic; } @Override @@ -199,15 +190,15 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable { private Optional<TableSource<?>> findAndCreateTableSource() { Optional<TableSource<?>> tableSource = Optional.empty(); try { - if (catalogBaseTable instanceof CatalogTableImpl) { + if (lookupResult.getTable() instanceof CatalogTable) { // Use an empty config for TableSourceFactoryContextImpl since we can't fetch the // actual TableConfig here. And currently the empty config do not affect the logic. - ReadableConfig config = new TableConfig().getConfiguration(); + ReadableConfig config = new Configuration(); TableSourceFactory.Context context = - new TableSourceFactoryContextImpl(tableIdentifier, (CatalogTable) catalogBaseTable, config); + new TableSourceFactoryContextImpl(tableIdentifier, (CatalogTable) lookupResult.getTable(), config); TableSource<?> source = TableFactoryUtil.findAndCreateTableSource(context); if (source instanceof StreamTableSource) { - if (!isStreamingMode && !((StreamTableSource) source).isBounded()) { + if (!isStreamingMode && !((StreamTableSource<?>) source).isBounded()) { throw new ValidationException("Cannot query on an unbounded source in batch mode, but " + tableIdentifier.asSummaryString() + " is unbounded."); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java index 66ee213..22ceb0f 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java @@ -78,11 +78,10 @@ class DatabaseCalciteSchema extends FlinkSchema { FlinkStatistic statistic = getStatistic(result.isTemporary(), table, identifier); return new CatalogSchemaTable( identifier, - table, + result, statistic, catalogManager.getCatalog(catalogName).orElseThrow(IllegalStateException::new), - isStreamingMode, - result.isTemporary()); + isStreamingMode); }) .orElse(null); } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java new file mode 100644 index 0000000..8771a51 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.catalog; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Tumble; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.table.planner.utils.TableTestUtil; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.api.Expressions.lit; + +/** + * Tests for resolving types of computed columns (including time attributes) of tables from catalog. + */ +@RunWith(Parameterized.class) +public class JavaCatalogTableTest extends TableTestBase { + @Parameterized.Parameters(name = "streamingMode = {0}") + public static Collection<Boolean> parameters() { + return Arrays.asList(true, false); + } + + @Parameterized.Parameter + public boolean isStreamingMode; + + private TableTestUtil getTestUtil() { + if (isStreamingMode) { + return streamTestUtil(new TableConfig()); + } else { + return batchTestUtil(new TableConfig()); + } + } + + @Test + public void testResolvingSchemaOfCustomCatalogTableSql() throws Exception { + TableTestUtil testUtil = getTestUtil(); + TableEnvironment tableEnvironment = testUtil.getTableEnv(); + GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory"); + genericInMemoryCatalog.createTable( + new ObjectPath("default", "testTable"), + new CustomCatalogTable(isStreamingMode), + false); + tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog); + tableEnvironment.executeSql("CREATE VIEW testTable2 AS SELECT * FROM testCatalog.`default`.testTable"); + + testUtil.verifyPlan( + "SELECT COUNT(*) FROM testTable2 GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE)"); + } + + @Test + public void testResolvingSchemaOfCustomCatalogTableTableApi() throws Exception { + TableTestUtil testUtil = getTestUtil(); + TableEnvironment tableEnvironment = testUtil.getTableEnv(); + GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory"); + genericInMemoryCatalog.createTable( + new ObjectPath("default", "testTable"), + new CustomCatalogTable(isStreamingMode), + false); + tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog); + + Table table = tableEnvironment.from("testCatalog.`default`.testTable") + .window(Tumble.over(lit(10).minute()).on($("rowtime")).as("w")) + .groupBy($("w")) + .select(lit(1).count()); + testUtil.verifyPlan(table); + } + + @Test + public void testResolvingProctimeOfCustomTableSql() throws Exception { + if (!isStreamingMode) { + // proctime not supported in batch + return; + } + TableTestUtil testUtil = getTestUtil(); + TableEnvironment tableEnvironment = testUtil.getTableEnv(); + GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory"); + genericInMemoryCatalog.createTable( + new ObjectPath("default", "testTable"), + new CustomCatalogTable(isStreamingMode), + false); + tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog); + + testUtil.verifyPlan("SELECT COUNT(*) FROM testCatalog.`default`.testTable " + + "GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTE)"); + } + + @Test + public void testResolvingProctimeOfCustomTableTableApi() throws Exception { + if (!isStreamingMode) { + // proctime not supported in batch + return; + } + TableTestUtil testUtil = getTestUtil(); + TableEnvironment tableEnvironment = testUtil.getTableEnv(); + GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory"); + genericInMemoryCatalog.createTable( + new ObjectPath("default", "testTable"), + new CustomCatalogTable(isStreamingMode), + false); + tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog); + + Table table = tableEnvironment.from("testCatalog.`default`.testTable") + .window(Tumble.over(lit(10).minute()).on($("proctime")).as("w")) + .groupBy($("w")) + .select(lit(1).count()); + testUtil.verifyPlan(table); + } + + private static class CustomCatalogTable implements CatalogTable { + + private final boolean isStreamingMode; + + private CustomCatalogTable(boolean isStreamingMode) { + this.isStreamingMode = isStreamingMode; + } + + @Override + public boolean isPartitioned() { + return false; + } + + @Override + public List<String> getPartitionKeys() { + return Collections.emptyList(); + } + + @Override + public CatalogTable copy(Map<String, String> options) { + return this; + } + + @Override + public Map<String, String> toProperties() { + return Collections.emptyMap(); + } + + @Override + public Map<String, String> getProperties() { + Map<String, String> map = new HashMap<>(); + map.put("connector", "values"); + map.put("bounded", Boolean.toString(!isStreamingMode)); + return map; + } + + @Override + public TableSchema getSchema() { + return TableSchema.builder() + .field("count", DataTypes.BIGINT()) + .field("rowtime", DataTypes.TIMESTAMP()) + .field("proctime", DataTypes.TIMESTAMP(), "proctime()") + .watermark("rowtime", "rowtime - INTERVAL '5' SECONDS", DataTypes.TIMESTAMP()) + .build(); + } + + @Override + public String getComment() { + return null; + } + + @Override + public CatalogBaseTable copy() { + return this; + } + + @Override + public Optional<String> getDescription() { + return Optional.empty(); + } + + @Override + public Optional<String> getDetailedDescription() { + return Optional.empty(); + } + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java index 3ec52507..a1a377d 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; @@ -72,13 +73,15 @@ public class FlinkCalciteCatalogReaderTest { @Test public void testGetFlinkPreparingTableBase() { // Mock CatalogSchemaTable. + TableSchema schema = TableSchema.builder().build(); CatalogSchemaTable mockTable = new CatalogSchemaTable( ObjectIdentifier.of("a", "b", "c"), - ConnectorCatalogTable.source(new TestTableSource(true, TableSchema.builder().build()), true), + CatalogManager.TableLookupResult.permanent(ConnectorCatalogTable.source( + new TestTableSource(true, schema), + true), schema), FlinkStatistic.UNKNOWN(), null, - true, - false); + true); rootSchemaPlus.add(tableMockName, mockTable); Prepare.PreparingTable preparingTable = catalogReader diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml new file mode 100644 index 0000000..821dba7 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml @@ -0,0 +1,153 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testResolvingProctimeOfCustomTableSql[streamingMode = true]"> + <Resource name="sql"> + <![CDATA[SELECT COUNT(*) FROM testCatalog.`default`.testTable GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTE)]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[$1]) ++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()]) + +- LogicalProject($f0=[$TUMBLE($2, 600000:INTERVAL MINUTE)]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 5000:INTERVAL SECOND)]) + +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[testCatalog, default, testTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +GroupWindowAggregate(window=[TumblingGroupWindow('w$, proctime, 600000)], select=[COUNT(*) AS EXPR$0]) ++- Exchange(distribution=[single]) + +- Calc(select=[proctime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) + +- Calc(select=[count, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[testCatalog, default, testTable]], fields=[count, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testResolvingSchemaOfCustomCatalogTableTableApi[streamingMode = true]"> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[$0]) ++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($3)], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[]) + +- LogicalProject(count=[$0], rowtime=[$1], proctime=[$2], $f3=[1]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 5000:INTERVAL SECOND)]) + +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[testCatalog, default, testTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 600000)], select=[COUNT($f3) AS EXPR$0]) ++- Exchange(distribution=[single]) + +- Calc(select=[count, rowtime, proctime, 1 AS $f3]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) + +- Calc(select=[count, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[testCatalog, default, testTable]], fields=[count, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testResolvingProctimeOfCustomTableTableApi[streamingMode = true]"> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[$0]) ++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($3)], window=[TumblingGroupWindow('w, proctime, 600000)], properties=[]) + +- LogicalProject(count=[$0], rowtime=[$1], proctime=[$2], $f3=[1]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 5000:INTERVAL SECOND)]) + +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[testCatalog, default, testTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +GroupWindowAggregate(window=[TumblingGroupWindow('w, proctime, 600000)], select=[COUNT($f3) AS EXPR$0]) ++- Exchange(distribution=[single]) + +- Calc(select=[count, rowtime, proctime, 1 AS $f3]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) + +- Calc(select=[count, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[testCatalog, default, testTable]], fields=[count, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testResolvingSchemaOfCustomCatalogTableSql[streamingMode = false]"> + <Resource name="sql"> + <![CDATA[SELECT COUNT(*) FROM testTable2 GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE)]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[$1]) ++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()]) + +- LogicalProject($f0=[$TUMBLE($1, 600000:INTERVAL MINUTE)]) + +- LogicalTableScan(table=[[testCatalog, default, testTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +HashWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 600000)], select=[Final_COUNT(count1$0) AS EXPR$0]) ++- Exchange(distribution=[single]) + +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 600000)], select=[Partial_COUNT(*) AS count1$0]) + +- TableSourceScan(table=[[testCatalog, default, testTable, project=[rowtime]]], fields=[rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testResolvingSchemaOfCustomCatalogTableSql[streamingMode = true]"> + <Resource name="sql"> + <![CDATA[SELECT COUNT(*) FROM testTable2 GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE)]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[$1]) ++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()]) + +- LogicalProject($f0=[$TUMBLE($1, 600000:INTERVAL MINUTE)]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 5000:INTERVAL SECOND)]) + +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[testCatalog, default, testTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 600000)], select=[COUNT(*) AS EXPR$0]) ++- Exchange(distribution=[single]) + +- Calc(select=[rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) + +- Calc(select=[count, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[testCatalog, default, testTable]], fields=[count, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testResolvingSchemaOfCustomCatalogTableTableApi[streamingMode = false]"> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[$0]) ++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($3)], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[]) + +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()], $f3=[1]) + +- LogicalTableScan(table=[[testCatalog, default, testTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +HashWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 600000)], select=[Final_COUNT(count$0) AS EXPR$0]) ++- Exchange(distribution=[single]) + +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 600000)], select=[Partial_COUNT($f3) AS count$0]) + +- Calc(select=[count, rowtime, PROCTIME() AS proctime, 1 AS $f3]) + +- TableSourceScan(table=[[testCatalog, default, testTable]], fields=[count, rowtime]) +]]> + </Resource> + </TestCase> +</Root> diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java index 6226dff..90f2eb6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java @@ -22,6 +22,7 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.catalog.CatalogManager.TableLookupResult; import org.apache.flink.table.factories.TableFactory; import org.apache.flink.table.factories.TableFactoryUtil; import org.apache.flink.table.factories.TableSourceFactory; @@ -79,7 +80,6 @@ class DatabaseCalciteSchema implements Schema { ObjectIdentifier identifier = ObjectIdentifier.of(catalogName, databaseName, tableName); return catalogManager.getTable(identifier) .map(result -> { - CatalogBaseTable table = result.getTable(); final TableFactory tableFactory; if (result.isTemporary()) { tableFactory = null; @@ -88,36 +88,49 @@ class DatabaseCalciteSchema implements Schema { .flatMap(Catalog::getTableFactory) .orElse(null); } - return convertTable(identifier, table, tableFactory); + return convertTable(identifier, result, tableFactory); }) .orElse(null); } - private Table convertTable(ObjectIdentifier identifier, CatalogBaseTable table, @Nullable TableFactory tableFactory) { + private Table convertTable(ObjectIdentifier identifier, TableLookupResult lookupResult, @Nullable TableFactory tableFactory) { + CatalogBaseTable table = lookupResult.getTable(); + TableSchema resolvedSchema = lookupResult.getResolvedSchema(); if (table instanceof QueryOperationCatalogView) { - return QueryOperationCatalogViewTable.createCalciteTable(((QueryOperationCatalogView) table)); + return QueryOperationCatalogViewTable.createCalciteTable( + ((QueryOperationCatalogView) table), + resolvedSchema); } else if (table instanceof ConnectorCatalogTable) { - return convertConnectorTable((ConnectorCatalogTable<?, ?>) table); - } else if (table instanceof CatalogTable) { - return convertCatalogTable(identifier, (CatalogTable) table, tableFactory); - } else if (table instanceof CatalogView) { - return convertCatalogView(identifier.getObjectName(), (CatalogView) table); + return convertConnectorTable((ConnectorCatalogTable<?, ?>) table, resolvedSchema); } else { - throw new TableException("Unsupported table type: " + table); + if (table instanceof CatalogTable) { + return convertCatalogTable( + identifier, + (CatalogTable) table, + resolvedSchema, + tableFactory); + } else if (table instanceof CatalogView) { + return convertCatalogView( + identifier.getObjectName(), + (CatalogView) table, + resolvedSchema); + } else { + throw new TableException("Unsupported table type: " + table); + } } } - private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) { - Optional<TableSourceTable> tableSourceTable = table.getTableSource() + private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table, TableSchema resolvedSchema) { + Optional<TableSourceTable<?>> tableSourceTable = table.getTableSource() .map(tableSource -> new TableSourceTable<>( - table.getSchema(), + resolvedSchema, tableSource, !table.isBatch(), FlinkStatistic.UNKNOWN())); if (tableSourceTable.isPresent()) { return tableSourceTable.get(); } else { - Optional<TableSinkTable> tableSinkTable = table.getTableSink() + Optional<TableSinkTable<?>> tableSinkTable = table.getTableSink() .map(tableSink -> new TableSinkTable<>( tableSink, FlinkStatistic.UNKNOWN())); @@ -130,13 +143,17 @@ class DatabaseCalciteSchema implements Schema { } } - private Table convertCatalogTable(ObjectIdentifier identifier, CatalogTable table, @Nullable TableFactory tableFactory) { + private Table convertCatalogTable( + ObjectIdentifier identifier, + CatalogTable table, + TableSchema resolvedSchema, + @Nullable TableFactory tableFactory) { final TableSource<?> tableSource; final TableSourceFactory.Context context = new TableSourceFactoryContextImpl( identifier, table, tableConfig.getConfiguration()); if (tableFactory != null) { if (tableFactory instanceof TableSourceFactory) { - tableSource = ((TableSourceFactory) tableFactory).createTableSource(context); + tableSource = ((TableSourceFactory<?>) tableFactory).createTableSource(context); } else { throw new TableException( "Cannot query a sink-only table. TableFactory provided by catalog must implement TableSourceFactory"); @@ -150,7 +167,7 @@ class DatabaseCalciteSchema implements Schema { } return new TableSourceTable<>( - table.getSchema(), + resolvedSchema, tableSource, // this means the TableSource extends from StreamTableSource, this is needed for the // legacy Planner. Blink Planner should use the information that comes from the TableSource @@ -160,11 +177,10 @@ class DatabaseCalciteSchema implements Schema { ); } - private Table convertCatalogView(String tableName, CatalogView table) { - TableSchema schema = table.getSchema(); + private Table convertCatalogView(String tableName, CatalogView table, TableSchema resolvedSchema) { return new ViewTable( null, - typeFactory -> ((FlinkTypeFactory) typeFactory).buildLogicalRowType(schema), + typeFactory -> ((FlinkTypeFactory) typeFactory).buildLogicalRowType(resolvedSchema), table.getExpandedQuery(), Arrays.asList(catalogName, databaseName), Arrays.asList(catalogName, databaseName, tableName) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java index dfcced1..bbbb6d4 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java @@ -51,12 +51,13 @@ public class QueryOperationCatalogViewTable extends AbstractTable implements Tra private final QueryOperationCatalogView catalogView; private final RelProtoDataType rowType; - public static QueryOperationCatalogViewTable createCalciteTable(QueryOperationCatalogView catalogView) { + public static QueryOperationCatalogViewTable createCalciteTable( + QueryOperationCatalogView catalogView, + TableSchema resolvedSchema) { return new QueryOperationCatalogViewTable(catalogView, typeFactory -> { - TableSchema tableSchema = catalogView.getSchema(); final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory; - final RelDataType relType = flinkTypeFactory.buildLogicalRowType(tableSchema); - Boolean[] nullables = tableSchema + final RelDataType relType = flinkTypeFactory.buildLogicalRowType(resolvedSchema); + Boolean[] nullables = resolvedSchema .getTableColumns() .stream() .map(c -> c.getType().getLogicalType().isNullable()) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 4e554f2..db10922 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -482,7 +482,7 @@ abstract class TableEnvImpl( val objectIdentifier: ObjectIdentifier = catalogManager.qualifyIdentifier(identifier) JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier)) - .map(t => new CatalogQueryOperation(objectIdentifier, t.getTable.getSchema)) + .map(t => new CatalogQueryOperation(objectIdentifier, t.getResolvedSchema)) } override def listModules(): Array[String] = {