This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new cc6258c  [FLINK-18378] Improve CatalogTable schema resolution
cc6258c is described below

commit cc6258c2876f76b68d9516508325af03a3e39c3e
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri Jun 19 15:53:08 2020 +0200

    [FLINK-18378] Improve CatalogTable schema resolution
    
    This closes #12725
---
 .../api/internal/CatalogTableSchemaResolver.java   |  35 ++--
 .../table/api/internal/TableEnvironmentImpl.java   |   7 +-
 .../apache/flink/table/catalog/CatalogManager.java |  42 ++--
 .../flink/table/catalog/CatalogTableImpl.java      |   8 -
 .../apache/flink/table/catalog/CatalogTest.java    |   2 +-
 .../table/planner/catalog/CatalogSchemaTable.java  |  61 +++---
 .../planner/catalog/DatabaseCalciteSchema.java     |   5 +-
 .../planner/catalog/JavaCatalogTableTest.java      | 211 +++++++++++++++++++++
 .../plan/FlinkCalciteCatalogReaderTest.java        |   9 +-
 .../table/planner/catalog/JavaCatalogTableTest.xml | 153 +++++++++++++++
 .../flink/table/catalog/DatabaseCalciteSchema.java |  56 ++++--
 .../catalog/QueryOperationCatalogViewTable.java    |   9 +-
 .../flink/table/api/internal/TableEnvImpl.scala    |   2 +-
 13 files changed, 490 insertions(+), 110 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
index 3e1e08a..e4d011f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
@@ -27,8 +27,10 @@ import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.TimestampKind;
 import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.table.types.utils.TypeConversions;
 
 /**
@@ -76,17 +78,17 @@ public class CatalogTableSchemaResolver {
                for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
                        TableColumn tableColumn = 
tableSchema.getTableColumns().get(i);
                        DataType fieldType = fieldTypes[i];
-                       if (tableColumn.isGenerated() && 
isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
-                               if (fieldNames[i].equals(rowtime)) {
-                                       throw new TableException("Watermark can 
not be defined for a processing time attribute column.");
+
+                       if (tableColumn.isGenerated()) {
+                               fieldType = 
resolveExpressionDataType(tableColumn.getExpr().get(), tableSchema);
+                               if (isProctime(fieldType)) {
+                                       if (fieldNames[i].equals(rowtime)) {
+                                               throw new 
TableException("Watermark can not be defined for a processing time attribute 
column.");
+                                       }
                                }
-                               TimestampType originalType = (TimestampType) 
fieldType.getLogicalType();
-                               LogicalType proctimeType = new TimestampType(
-                                               originalType.isNullable(),
-                                               TimestampKind.PROCTIME,
-                                               originalType.getPrecision());
-                               fieldType = 
TypeConversions.fromLogicalToDataType(proctimeType);
-                       } else if (isStreamingMode && 
fieldNames[i].equals(rowtime)) {
+                       }
+
+                       if (isStreamingMode && fieldNames[i].equals(rowtime)) {
                                TimestampType originalType = (TimestampType) 
fieldType.getLogicalType();
                                LogicalType rowtimeType = new TimestampType(
                                                originalType.isNullable(),
@@ -94,6 +96,7 @@ public class CatalogTableSchemaResolver {
                                                originalType.getPrecision());
                                fieldType = 
TypeConversions.fromLogicalToDataType(rowtimeType);
                        }
+
                        if (tableColumn.isGenerated()) {
                                builder.field(fieldNames[i], fieldType, 
tableColumn.getExpr().get());
                        } else {
@@ -107,12 +110,16 @@ public class CatalogTableSchemaResolver {
                return builder.build();
        }
 
-       private boolean isProctimeType(String expr, TableSchema tableSchema) {
+       private boolean isProctime(DataType exprType) {
+               return LogicalTypeChecks.hasFamily(exprType.getLogicalType(), 
LogicalTypeFamily.TIMESTAMP) &&
+                       
LogicalTypeChecks.isProctimeAttribute(exprType.getLogicalType());
+       }
+
+       private DataType resolveExpressionDataType(String expr, TableSchema 
tableSchema) {
                ResolvedExpression resolvedExpr = 
parser.parseSqlExpression(expr, tableSchema);
                if (resolvedExpr == null) {
-                       return false;
+                       throw new ValidationException("Could not resolve field 
expression: " + expr);
                }
-               LogicalType type = 
resolvedExpr.getOutputDataType().getLogicalType();
-               return type instanceof TimestampType && ((TimestampType) 
type).getKind() == TimestampKind.PROCTIME;
+               return resolvedExpr.getOutputDataType();
        }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index a95a3f4..95deb43 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -519,9 +519,8 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
        private Optional<CatalogQueryOperation> 
scanInternal(UnresolvedIdentifier identifier) {
                ObjectIdentifier tableIdentifier = 
catalogManager.qualifyIdentifier(identifier);
 
-               return catalogManager.getTable(tableIdentifier).map(t -> {
-                       return new CatalogQueryOperation(tableIdentifier, 
t.getTable().getSchema());
-               });
+               return catalogManager.getTable(tableIdentifier)
+                       .map(t -> new CatalogQueryOperation(tableIdentifier, 
t.getResolvedSchema()));
        }
 
        @Override
@@ -1062,7 +1061,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                        Optional<CatalogManager.TableLookupResult> result =
                                        
catalogManager.getTable(describeTableOperation.getSqlIdentifier());
                        if (result.isPresent()) {
-                               return 
buildDescribeResult(result.get().getTable().getSchema());
+                               return 
buildDescribeResult(result.get().getResolvedSchema());
                        } else {
                                throw new ValidationException(String.format(
                                                "Tables or views with the 
identifier '%s' doesn't exist",
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 9c1e1d3..f80efd1 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.CatalogNotExistException;
@@ -323,18 +324,25 @@ public final class CatalogManager {
        public static class TableLookupResult {
                private final boolean isTemporary;
                private final CatalogBaseTable table;
+               private final TableSchema resolvedSchema;
 
-               private static TableLookupResult temporary(CatalogBaseTable 
table) {
-                       return new TableLookupResult(true, table);
+               @VisibleForTesting
+               public static TableLookupResult temporary(CatalogBaseTable 
table, TableSchema resolvedSchema) {
+                       return new TableLookupResult(true, table, 
resolvedSchema);
                }
 
-               private static TableLookupResult permanent(CatalogBaseTable 
table) {
-                       return new TableLookupResult(false, table);
+               @VisibleForTesting
+               public static TableLookupResult permanent(CatalogBaseTable 
table, TableSchema resolvedSchema) {
+                       return new TableLookupResult(false, table, 
resolvedSchema);
                }
 
-               private TableLookupResult(boolean isTemporary, CatalogBaseTable 
table) {
+               private TableLookupResult(
+                               boolean isTemporary,
+                               CatalogBaseTable table,
+                               TableSchema resolvedSchema) {
                        this.isTemporary = isTemporary;
                        this.table = table;
+                       this.resolvedSchema = resolvedSchema;
                }
 
                public boolean isTemporary() {
@@ -344,6 +352,10 @@ public final class CatalogManager {
                public CatalogBaseTable getTable() {
                        return table;
                }
+
+               public TableSchema getResolvedSchema() {
+                       return resolvedSchema;
+               }
        }
 
        /**
@@ -357,21 +369,15 @@ public final class CatalogManager {
                Preconditions.checkNotNull(schemaResolver, "schemaResolver 
should not be null");
                CatalogBaseTable temporaryTable = 
temporaryTables.get(objectIdentifier);
                if (temporaryTable != null) {
-                       return 
Optional.of(TableLookupResult.temporary(resolveTableSchema(temporaryTable)));
+                       TableSchema resolvedSchema = 
resolveTableSchema(temporaryTable);
+                       return 
Optional.of(TableLookupResult.temporary(temporaryTable, resolvedSchema));
                } else {
-                       Optional<TableLookupResult> result = 
getPermanentTable(objectIdentifier);
-                       return result.map(tableLookupResult ->
-                                       
TableLookupResult.permanent(resolveTableSchema(tableLookupResult.getTable())));
+                       return getPermanentTable(objectIdentifier);
                }
        }
 
-       private CatalogBaseTable resolveTableSchema(CatalogBaseTable table) {
-               if (!(table instanceof CatalogTableImpl)) {
-                       return table;
-               }
-               CatalogTableImpl catalogTableImpl = (CatalogTableImpl) table;
-               TableSchema newTableSchema = 
schemaResolver.resolve(catalogTableImpl.getSchema());
-               return catalogTableImpl.copy(newTableSchema);
+       private TableSchema resolveTableSchema(CatalogBaseTable table) {
+               return schemaResolver.resolve(table.getSchema());
        }
 
        /**
@@ -398,7 +404,9 @@ public final class CatalogManager {
                ObjectPath objectPath = objectIdentifier.toObjectPath();
                if (currentCatalog != null) {
                        try {
-                               return 
Optional.of(TableLookupResult.permanent(currentCatalog.getTable(objectPath)));
+                               CatalogBaseTable catalogTable = 
currentCatalog.getTable(objectPath);
+                               TableSchema resolvedSchema = 
resolveTableSchema(catalogTable);
+                               return 
Optional.of(TableLookupResult.permanent(catalogTable, resolvedSchema));
                        } catch (TableNotExistException e) {
                                // Ignore.
                        }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
index dfe5d8b..9ac9c5c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
@@ -88,14 +88,6 @@ public class CatalogTableImpl extends AbstractCatalogTable {
                return new CatalogTableImpl(getSchema(), getPartitionKeys(), 
options, getComment());
        }
 
-       public CatalogTable copy(TableSchema tableSchema) {
-               return new CatalogTableImpl(
-                               tableSchema.copy(),
-                               new ArrayList<>(getPartitionKeys()),
-                               new HashMap<>(getProperties()),
-                               getComment());
-       }
-
        /**
         * Construct a {@link CatalogTableImpl} from complete properties that 
contains table schema.
         */
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
index 2ad212b..ca7bda8 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
@@ -1335,7 +1335,7 @@ public abstract class CatalogTest {
 
                @Override
                public TableSchema getSchema() {
-                       return null;
+                       return TableSchema.builder().build();
                }
 
                @Override
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
index 23a8d80..ebf53f4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
@@ -18,16 +18,16 @@
 
 package org.apache.flink.table.planner.catalog;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogManager.TableLookupResult;
 import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.factories.TableFactoryUtil;
@@ -67,10 +67,9 @@ public class CatalogSchemaTable extends AbstractTable 
implements TemporalTable {
        //~ Instance fields 
--------------------------------------------------------
 
        private final ObjectIdentifier tableIdentifier;
-       private final CatalogBaseTable catalogBaseTable;
+       private final TableLookupResult lookupResult;
        private final FlinkStatistic statistic;
        private final boolean isStreamingMode;
-       private final boolean isTemporary;
        private final Catalog catalog;
 
        //~ Constructors 
-----------------------------------------------------------
@@ -79,25 +78,22 @@ public class CatalogSchemaTable extends AbstractTable 
implements TemporalTable {
         * Create a CatalogSchemaTable instance.
         *
         * @param tableIdentifier Table identifier
-        * @param catalogBaseTable CatalogBaseTable instance which exists in 
the catalog
+        * @param lookupResult A result of catalog lookup
         * @param statistic Table statistics
         * @param catalog The catalog which the schema table belongs to
         * @param isStreaming If the table is for streaming mode
-        * @param isTemporary If the table is temporary
         */
        public CatalogSchemaTable(
                        ObjectIdentifier tableIdentifier,
-                       CatalogBaseTable catalogBaseTable,
+                       TableLookupResult lookupResult,
                        FlinkStatistic statistic,
                        Catalog catalog,
-                       boolean isStreaming,
-                       boolean isTemporary) {
+                       boolean isStreaming) {
                this.tableIdentifier = tableIdentifier;
-               this.catalogBaseTable = catalogBaseTable;
+               this.lookupResult = lookupResult;
                this.statistic = statistic;
                this.catalog = catalog;
                this.isStreamingMode = isStreaming;
-               this.isTemporary = isTemporary;
        }
 
        //~ Methods 
----------------------------------------------------------------
@@ -111,11 +107,11 @@ public class CatalogSchemaTable extends AbstractTable 
implements TemporalTable {
        }
 
        public CatalogBaseTable getCatalogTable() {
-               return catalogBaseTable;
+               return lookupResult.getTable();
        }
 
        public boolean isTemporary() {
-               return isTemporary;
+               return lookupResult.isTemporary();
        }
 
        public boolean isStreamingMode() {
@@ -124,23 +120,13 @@ public class CatalogSchemaTable extends AbstractTable 
implements TemporalTable {
 
        @Override
        public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-               return getRowType(typeFactory, catalogBaseTable, 
isStreamingMode);
-       }
-
-       @Override
-       public FlinkStatistic getStatistic() {
-               return statistic;
-       }
-
-       private RelDataType getRowType(RelDataTypeFactory typeFactory,
-                       CatalogBaseTable catalogBaseTable,
-                       boolean isStreamingMode) {
                final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) 
typeFactory;
-               TableSchema tableSchema = catalogBaseTable.getSchema();
+               TableSchema tableSchema = lookupResult.getResolvedSchema();
                final DataType[] fieldDataTypes = 
tableSchema.getFieldDataTypes();
+               CatalogBaseTable catalogTable = lookupResult.getTable();
                if (!isStreamingMode
-                       && catalogBaseTable instanceof ConnectorCatalogTable
-                       && ((ConnectorCatalogTable) 
catalogBaseTable).getTableSource().isPresent()) {
+                               && catalogTable instanceof ConnectorCatalogTable
+                               && ((ConnectorCatalogTable<?, ?>) 
catalogTable).getTableSource().isPresent()) {
                        // If the table source is bounded, materialize the time 
attributes to normal TIMESTAMP type.
                        // Now for ConnectorCatalogTable, there is no way to
                        // deduce if it is bounded in the table environment, so 
the data types in TableSchema
@@ -180,10 +166,15 @@ public class CatalogSchemaTable extends AbstractTable 
implements TemporalTable {
                }
 
                return TableSourceUtil.getSourceRowType(
-                               flinkTypeFactory,
-                               tableSchema,
-                               scala.Option.empty(),
-                               isStreamingMode);
+                       flinkTypeFactory,
+                       tableSchema,
+                       scala.Option.empty(),
+                       isStreamingMode);
+       }
+
+       @Override
+       public FlinkStatistic getStatistic() {
+               return statistic;
        }
 
        @Override
@@ -199,15 +190,15 @@ public class CatalogSchemaTable extends AbstractTable 
implements TemporalTable {
        private Optional<TableSource<?>> findAndCreateTableSource() {
                Optional<TableSource<?>> tableSource = Optional.empty();
                try {
-                       if (catalogBaseTable instanceof CatalogTableImpl) {
+                       if (lookupResult.getTable() instanceof CatalogTable) {
                                // Use an empty config for 
TableSourceFactoryContextImpl since we can't fetch the
                                // actual TableConfig here. And currently the 
empty config do not affect the logic.
-                               ReadableConfig config = new 
TableConfig().getConfiguration();
+                               ReadableConfig config = new Configuration();
                                TableSourceFactory.Context context =
-                                       new 
TableSourceFactoryContextImpl(tableIdentifier, (CatalogTable) catalogBaseTable, 
config);
+                                       new 
TableSourceFactoryContextImpl(tableIdentifier, (CatalogTable) 
lookupResult.getTable(), config);
                                TableSource<?> source = 
TableFactoryUtil.findAndCreateTableSource(context);
                                if (source instanceof StreamTableSource) {
-                                       if (!isStreamingMode && 
!((StreamTableSource) source).isBounded()) {
+                                       if (!isStreamingMode && 
!((StreamTableSource<?>) source).isBounded()) {
                                                throw new 
ValidationException("Cannot query on an unbounded source in batch mode, but " +
                                                        
tableIdentifier.asSummaryString() + " is unbounded.");
                                        }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
index 66ee213..22ceb0f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
@@ -78,11 +78,10 @@ class DatabaseCalciteSchema extends FlinkSchema {
                                FlinkStatistic statistic = 
getStatistic(result.isTemporary(), table, identifier);
                                return new CatalogSchemaTable(
                                        identifier,
-                                       table,
+                                       result,
                                        statistic,
                                        
catalogManager.getCatalog(catalogName).orElseThrow(IllegalStateException::new),
-                                       isStreamingMode,
-                                       result.isTemporary());
+                                       isStreamingMode);
                        })
                        .orElse(null);
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
new file mode 100644
index 0000000..8771a51
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Tumble;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.lit;
+
+/**
+ * Tests for resolving types of computed columns (including time attributes) 
of tables from catalog.
+ */
+@RunWith(Parameterized.class)
+public class JavaCatalogTableTest extends TableTestBase {
+       @Parameterized.Parameters(name = "streamingMode = {0}")
+       public static Collection<Boolean> parameters() {
+               return Arrays.asList(true, false);
+       }
+
+       @Parameterized.Parameter
+       public boolean isStreamingMode;
+
+       private TableTestUtil getTestUtil() {
+               if (isStreamingMode) {
+                       return streamTestUtil(new TableConfig());
+               } else {
+                       return batchTestUtil(new TableConfig());
+               }
+       }
+
+       @Test
+       public void testResolvingSchemaOfCustomCatalogTableSql() throws 
Exception {
+               TableTestUtil testUtil = getTestUtil();
+               TableEnvironment tableEnvironment = testUtil.getTableEnv();
+               GenericInMemoryCatalog genericInMemoryCatalog = new 
GenericInMemoryCatalog("in-memory");
+               genericInMemoryCatalog.createTable(
+                       new ObjectPath("default", "testTable"),
+                       new CustomCatalogTable(isStreamingMode),
+                       false);
+               tableEnvironment.registerCatalog("testCatalog", 
genericInMemoryCatalog);
+               tableEnvironment.executeSql("CREATE VIEW testTable2 AS SELECT * 
FROM testCatalog.`default`.testTable");
+
+               testUtil.verifyPlan(
+                       "SELECT COUNT(*) FROM testTable2 GROUP BY 
TUMBLE(rowtime, INTERVAL '10' MINUTE)");
+       }
+
+       @Test
+       public void testResolvingSchemaOfCustomCatalogTableTableApi() throws 
Exception {
+               TableTestUtil testUtil = getTestUtil();
+               TableEnvironment tableEnvironment = testUtil.getTableEnv();
+               GenericInMemoryCatalog genericInMemoryCatalog = new 
GenericInMemoryCatalog("in-memory");
+               genericInMemoryCatalog.createTable(
+                       new ObjectPath("default", "testTable"),
+                       new CustomCatalogTable(isStreamingMode),
+                       false);
+               tableEnvironment.registerCatalog("testCatalog", 
genericInMemoryCatalog);
+
+               Table table = 
tableEnvironment.from("testCatalog.`default`.testTable")
+                       
.window(Tumble.over(lit(10).minute()).on($("rowtime")).as("w"))
+                       .groupBy($("w"))
+                       .select(lit(1).count());
+               testUtil.verifyPlan(table);
+       }
+
+       @Test
+       public void testResolvingProctimeOfCustomTableSql() throws Exception {
+               if (!isStreamingMode) {
+                       // proctime not supported in batch
+                       return;
+               }
+               TableTestUtil testUtil = getTestUtil();
+               TableEnvironment tableEnvironment = testUtil.getTableEnv();
+               GenericInMemoryCatalog genericInMemoryCatalog = new 
GenericInMemoryCatalog("in-memory");
+               genericInMemoryCatalog.createTable(
+                       new ObjectPath("default", "testTable"),
+                       new CustomCatalogTable(isStreamingMode),
+                       false);
+               tableEnvironment.registerCatalog("testCatalog", 
genericInMemoryCatalog);
+
+               testUtil.verifyPlan("SELECT COUNT(*) FROM 
testCatalog.`default`.testTable " +
+                       "GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTE)");
+       }
+
+       @Test
+       public void testResolvingProctimeOfCustomTableTableApi() throws 
Exception {
+               if (!isStreamingMode) {
+                       // proctime not supported in batch
+                       return;
+               }
+               TableTestUtil testUtil = getTestUtil();
+               TableEnvironment tableEnvironment = testUtil.getTableEnv();
+               GenericInMemoryCatalog genericInMemoryCatalog = new 
GenericInMemoryCatalog("in-memory");
+               genericInMemoryCatalog.createTable(
+                       new ObjectPath("default", "testTable"),
+                       new CustomCatalogTable(isStreamingMode),
+                       false);
+               tableEnvironment.registerCatalog("testCatalog", 
genericInMemoryCatalog);
+
+               Table table = 
tableEnvironment.from("testCatalog.`default`.testTable")
+                       
.window(Tumble.over(lit(10).minute()).on($("proctime")).as("w"))
+                       .groupBy($("w"))
+                       .select(lit(1).count());
+               testUtil.verifyPlan(table);
+       }
+
+       private static class CustomCatalogTable implements CatalogTable {
+
+               private final boolean isStreamingMode;
+
+               private CustomCatalogTable(boolean isStreamingMode) {
+                       this.isStreamingMode = isStreamingMode;
+               }
+
+               @Override
+               public boolean isPartitioned() {
+                       return false;
+               }
+
+               @Override
+               public List<String> getPartitionKeys() {
+                       return Collections.emptyList();
+               }
+
+               @Override
+               public CatalogTable copy(Map<String, String> options) {
+                       return this;
+               }
+
+               @Override
+               public Map<String, String> toProperties() {
+                       return Collections.emptyMap();
+               }
+
+               @Override
+               public Map<String, String> getProperties() {
+                       Map<String, String> map = new HashMap<>();
+                       map.put("connector", "values");
+                       map.put("bounded", Boolean.toString(!isStreamingMode));
+                       return map;
+               }
+
+               @Override
+               public TableSchema getSchema() {
+                       return TableSchema.builder()
+                               .field("count", DataTypes.BIGINT())
+                               .field("rowtime", DataTypes.TIMESTAMP())
+                               .field("proctime", DataTypes.TIMESTAMP(), 
"proctime()")
+                               .watermark("rowtime", "rowtime - INTERVAL '5' 
SECONDS", DataTypes.TIMESTAMP())
+                               .build();
+               }
+
+               @Override
+               public String getComment() {
+                       return null;
+               }
+
+               @Override
+               public CatalogBaseTable copy() {
+                       return this;
+               }
+
+               @Override
+               public Optional<String> getDescription() {
+                       return Optional.empty();
+               }
+
+               @Override
+               public Optional<String> getDetailedDescription() {
+                       return Optional.empty();
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
index 3ec52507..a1a377d 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan;
 
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
@@ -72,13 +73,15 @@ public class FlinkCalciteCatalogReaderTest {
        @Test
        public void testGetFlinkPreparingTableBase() {
                // Mock CatalogSchemaTable.
+               TableSchema schema = TableSchema.builder().build();
                CatalogSchemaTable mockTable = new CatalogSchemaTable(
                        ObjectIdentifier.of("a", "b", "c"),
-                       ConnectorCatalogTable.source(new TestTableSource(true, 
TableSchema.builder().build()), true),
+                       
CatalogManager.TableLookupResult.permanent(ConnectorCatalogTable.source(
+                               new TestTableSource(true, schema),
+                               true), schema),
                        FlinkStatistic.UNKNOWN(),
                        null,
-                       true,
-                       false);
+                       true);
 
                rootSchemaPlus.add(tableMockName, mockTable);
                Prepare.PreparingTable preparingTable = catalogReader
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
new file mode 100644
index 0000000..821dba7
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
@@ -0,0 +1,153 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testResolvingProctimeOfCustomTableSql[streamingMode = true]">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*) FROM testCatalog.`default`.testTable GROUP BY 
TUMBLE(proctime, INTERVAL '10' MINUTE)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
+   +- LogicalProject($f0=[$TUMBLE($2, 600000:INTERVAL MINUTE)])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 
5000:INTERVAL SECOND)])
+         +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[testCatalog, default, testTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupWindowAggregate(window=[TumblingGroupWindow('w$, proctime, 600000)], 
select=[COUNT(*) AS EXPR$0])
++- Exchange(distribution=[single])
+   +- Calc(select=[proctime])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
5000:INTERVAL SECOND)])
+         +- Calc(select=[count, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[testCatalog, default, testTable]], 
fields=[count, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testResolvingSchemaOfCustomCatalogTableTableApi[streamingMode = true]">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$0])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($3)], 
window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[])
+   +- LogicalProject(count=[$0], rowtime=[$1], proctime=[$2], $f3=[1])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 
5000:INTERVAL SECOND)])
+         +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[testCatalog, default, testTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 600000)], 
select=[COUNT($f3) AS EXPR$0])
++- Exchange(distribution=[single])
+   +- Calc(select=[count, rowtime, proctime, 1 AS $f3])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
5000:INTERVAL SECOND)])
+         +- Calc(select=[count, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[testCatalog, default, testTable]], 
fields=[count, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testResolvingProctimeOfCustomTableTableApi[streamingMode = 
true]">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$0])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($3)], 
window=[TumblingGroupWindow('w, proctime, 600000)], properties=[])
+   +- LogicalProject(count=[$0], rowtime=[$1], proctime=[$2], $f3=[1])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 
5000:INTERVAL SECOND)])
+         +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[testCatalog, default, testTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupWindowAggregate(window=[TumblingGroupWindow('w, proctime, 600000)], 
select=[COUNT($f3) AS EXPR$0])
++- Exchange(distribution=[single])
+   +- Calc(select=[count, rowtime, proctime, 1 AS $f3])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
5000:INTERVAL SECOND)])
+         +- Calc(select=[count, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[testCatalog, default, testTable]], 
fields=[count, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testResolvingSchemaOfCustomCatalogTableSql[streamingMode = 
false]">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*) FROM testTable2 GROUP BY TUMBLE(rowtime, 
INTERVAL '10' MINUTE)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
+   +- LogicalProject($f0=[$TUMBLE($1, 600000:INTERVAL MINUTE)])
+      +- LogicalTableScan(table=[[testCatalog, default, testTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 600000)], 
select=[Final_COUNT(count1$0) AS EXPR$0])
++- Exchange(distribution=[single])
+   +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 
600000)], select=[Partial_COUNT(*) AS count1$0])
+      +- TableSourceScan(table=[[testCatalog, default, testTable, 
project=[rowtime]]], fields=[rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testResolvingSchemaOfCustomCatalogTableSql[streamingMode = 
true]">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*) FROM testTable2 GROUP BY TUMBLE(rowtime, 
INTERVAL '10' MINUTE)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
+   +- LogicalProject($f0=[$TUMBLE($1, 600000:INTERVAL MINUTE)])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 
5000:INTERVAL SECOND)])
+         +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()])
+            +- LogicalTableScan(table=[[testCatalog, default, testTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 600000)], 
select=[COUNT(*) AS EXPR$0])
++- Exchange(distribution=[single])
+   +- Calc(select=[rowtime])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
5000:INTERVAL SECOND)])
+         +- Calc(select=[count, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[testCatalog, default, testTable]], 
fields=[count, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testResolvingSchemaOfCustomCatalogTableTableApi[streamingMode = false]">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$0])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($3)], 
window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[])
+   +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()], $f3=[1])
+      +- LogicalTableScan(table=[[testCatalog, default, testTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 600000)], 
select=[Final_COUNT(count$0) AS EXPR$0])
++- Exchange(distribution=[single])
+   +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 
600000)], select=[Partial_COUNT($f3) AS count$0])
+      +- Calc(select=[count, rowtime, PROCTIME() AS proctime, 1 AS $f3])
+         +- TableSourceScan(table=[[testCatalog, default, testTable]], 
fields=[count, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 6226dff..90f2eb6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.catalog.CatalogManager.TableLookupResult;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.factories.TableSourceFactory;
@@ -79,7 +80,6 @@ class DatabaseCalciteSchema implements Schema {
                ObjectIdentifier identifier = ObjectIdentifier.of(catalogName, 
databaseName, tableName);
                return catalogManager.getTable(identifier)
                        .map(result -> {
-                               CatalogBaseTable table = result.getTable();
                                final TableFactory tableFactory;
                                if (result.isTemporary()) {
                                        tableFactory = null;
@@ -88,36 +88,49 @@ class DatabaseCalciteSchema implements Schema {
                                                
.flatMap(Catalog::getTableFactory)
                                                .orElse(null);
                                }
-                               return convertTable(identifier, table, 
tableFactory);
+                               return convertTable(identifier, result, 
tableFactory);
                        })
                        .orElse(null);
        }
 
-       private Table convertTable(ObjectIdentifier identifier, 
CatalogBaseTable table, @Nullable TableFactory tableFactory) {
+       private Table convertTable(ObjectIdentifier identifier, 
TableLookupResult lookupResult, @Nullable TableFactory tableFactory) {
+               CatalogBaseTable table = lookupResult.getTable();
+               TableSchema resolvedSchema = lookupResult.getResolvedSchema();
                if (table instanceof QueryOperationCatalogView) {
-                       return 
QueryOperationCatalogViewTable.createCalciteTable(((QueryOperationCatalogView) 
table));
+                       return 
QueryOperationCatalogViewTable.createCalciteTable(
+                               ((QueryOperationCatalogView) table),
+                               resolvedSchema);
                } else if (table instanceof ConnectorCatalogTable) {
-                       return convertConnectorTable((ConnectorCatalogTable<?, 
?>) table);
-               } else if (table instanceof CatalogTable) {
-                       return convertCatalogTable(identifier, (CatalogTable) 
table, tableFactory);
-               } else if (table instanceof CatalogView) {
-                       return convertCatalogView(identifier.getObjectName(), 
(CatalogView) table);
+                       return convertConnectorTable((ConnectorCatalogTable<?, 
?>) table, resolvedSchema);
                } else {
-                       throw new TableException("Unsupported table type: " + 
table);
+                       if (table instanceof CatalogTable) {
+                               return convertCatalogTable(
+                                       identifier,
+                                       (CatalogTable) table,
+                                       resolvedSchema,
+                                       tableFactory);
+                       } else if (table instanceof CatalogView) {
+                               return convertCatalogView(
+                                       identifier.getObjectName(),
+                                       (CatalogView) table,
+                                       resolvedSchema);
+                       } else {
+                               throw new TableException("Unsupported table 
type: " + table);
+                       }
                }
        }
 
-       private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) {
-               Optional<TableSourceTable> tableSourceTable = 
table.getTableSource()
+       private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table, 
TableSchema resolvedSchema) {
+               Optional<TableSourceTable<?>> tableSourceTable = 
table.getTableSource()
                        .map(tableSource -> new TableSourceTable<>(
-                               table.getSchema(),
+                               resolvedSchema,
                                tableSource,
                                !table.isBatch(),
                                FlinkStatistic.UNKNOWN()));
                if (tableSourceTable.isPresent()) {
                        return tableSourceTable.get();
                } else {
-                       Optional<TableSinkTable> tableSinkTable = 
table.getTableSink()
+                       Optional<TableSinkTable<?>> tableSinkTable = 
table.getTableSink()
                                .map(tableSink -> new TableSinkTable<>(
                                        tableSink,
                                        FlinkStatistic.UNKNOWN()));
@@ -130,13 +143,17 @@ class DatabaseCalciteSchema implements Schema {
                }
        }
 
-       private Table convertCatalogTable(ObjectIdentifier identifier, 
CatalogTable table, @Nullable TableFactory tableFactory) {
+       private Table convertCatalogTable(
+                       ObjectIdentifier identifier,
+                       CatalogTable table,
+                       TableSchema resolvedSchema,
+                       @Nullable TableFactory tableFactory) {
                final TableSource<?> tableSource;
                final TableSourceFactory.Context context = new 
TableSourceFactoryContextImpl(
                                identifier, table, 
tableConfig.getConfiguration());
                if (tableFactory != null) {
                        if (tableFactory instanceof TableSourceFactory) {
-                               tableSource = ((TableSourceFactory) 
tableFactory).createTableSource(context);
+                               tableSource = ((TableSourceFactory<?>) 
tableFactory).createTableSource(context);
                        } else {
                                throw new TableException(
                                        "Cannot query a sink-only table. 
TableFactory provided by catalog must implement TableSourceFactory");
@@ -150,7 +167,7 @@ class DatabaseCalciteSchema implements Schema {
                }
 
                return new TableSourceTable<>(
-                       table.getSchema(),
+                       resolvedSchema,
                        tableSource,
                        // this means the TableSource extends from 
StreamTableSource, this is needed for the
                        // legacy Planner. Blink Planner should use the 
information that comes from the TableSource
@@ -160,11 +177,10 @@ class DatabaseCalciteSchema implements Schema {
                );
        }
 
-       private Table convertCatalogView(String tableName, CatalogView table) {
-               TableSchema schema = table.getSchema();
+       private Table convertCatalogView(String tableName, CatalogView table, 
TableSchema resolvedSchema) {
                return new ViewTable(
                        null,
-                       typeFactory -> ((FlinkTypeFactory) 
typeFactory).buildLogicalRowType(schema),
+                       typeFactory -> ((FlinkTypeFactory) 
typeFactory).buildLogicalRowType(resolvedSchema),
                        table.getExpandedQuery(),
                        Arrays.asList(catalogName, databaseName),
                        Arrays.asList(catalogName, databaseName, tableName)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java
index dfcced1..bbbb6d4 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java
@@ -51,12 +51,13 @@ public class QueryOperationCatalogViewTable extends 
AbstractTable implements Tra
        private final QueryOperationCatalogView catalogView;
        private final RelProtoDataType rowType;
 
-       public static QueryOperationCatalogViewTable 
createCalciteTable(QueryOperationCatalogView catalogView) {
+       public static QueryOperationCatalogViewTable createCalciteTable(
+                       QueryOperationCatalogView catalogView,
+                       TableSchema resolvedSchema) {
                return new QueryOperationCatalogViewTable(catalogView, 
typeFactory -> {
-                       TableSchema tableSchema = catalogView.getSchema();
                        final FlinkTypeFactory flinkTypeFactory = 
(FlinkTypeFactory) typeFactory;
-                       final RelDataType relType = 
flinkTypeFactory.buildLogicalRowType(tableSchema);
-                       Boolean[] nullables = tableSchema
+                       final RelDataType relType = 
flinkTypeFactory.buildLogicalRowType(resolvedSchema);
+                       Boolean[] nullables = resolvedSchema
                                .getTableColumns()
                                .stream()
                                .map(c -> 
c.getType().getLogicalType().isNullable())
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 4e554f2..db10922 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -482,7 +482,7 @@ abstract class TableEnvImpl(
     val objectIdentifier: ObjectIdentifier = 
catalogManager.qualifyIdentifier(identifier)
 
     JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier))
-      .map(t => new CatalogQueryOperation(objectIdentifier, 
t.getTable.getSchema))
+      .map(t => new CatalogQueryOperation(objectIdentifier, 
t.getResolvedSchema))
   }
 
   override def listModules(): Array[String] = {

Reply via email to