twalthr commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r784698819



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -263,7 +265,9 @@ public static DynamicTableSink createDynamicTableSink(
                             "Unable to create a sink for writing table 
'%s'.\n\n"

Review comment:
       same comment as above

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -235,7 +235,7 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
         KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions);
 
         validatePKConstraints(
-                context.getObjectIdentifier(),
+                
context.getIdentifier().map(ObjectIdentifier::toString).orElse("anonymous"),

Review comment:
       adapt the error message to `The Kafka table with '%s' format...`. Having 
`"anonymous"` in the code base is not helpful, in this case we should just omit 
`'%s'`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java
##########
@@ -84,7 +90,7 @@ public QueryOperation getChild() {
     @Override
     public String asSummaryString() {
         Map<String, Object> params = new LinkedHashMap<>();
-        params.put("identifier", tableIdentifier);
+        params.put("identifier", contextResolvedTable);

Review comment:
       `identifier` -> `table`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, 
its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and 
has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link 
CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and it's flagged as temporary.
+ *   <li>A anonymous/inline table: a table which is not stored in a catalog 
and doesn't have an
+ *       associated unique {@link ObjectIdentifier}.
+ * </ul>
+ *
+ * <p>The different handling of temporary and permanent tables is {@link 
Catalog} and {@link
+ * CatalogManager} instance specific, hence for these two kind of tables, an 
instance of this object
+ * represents the relationship between the specific {@link 
ResolvedCatalogBaseTable} instance and
+ * the specific {@link Catalog}/{@link CatalogManager} instances. For example, 
the same {@link
+ * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent 
for another one.
+ */
+@Internal
+public class ContextResolvedTable {
+
+    private final @Nullable ObjectIdentifier objectIdentifier;
+    private final @Nullable Catalog catalog;
+    private final ResolvedCatalogBaseTable<?> resolvedTable;
+
+    public static ContextResolvedTable permanent(
+            ObjectIdentifier identifier,
+            Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                identifier, Preconditions.checkNotNull(catalog), 
resolvedTable);
+    }
+
+    public static ContextResolvedTable temporary(
+            ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> 
resolvedTable) {
+        return new ContextResolvedTable(identifier, null, resolvedTable);
+    }
+
+    public static ContextResolvedTable anonymous(ResolvedCatalogBaseTable<?> 
resolvedTable) {
+        return new ContextResolvedTable(null, null, resolvedTable);
+    }
+
+    private ContextResolvedTable(
+            @Nullable ObjectIdentifier objectIdentifier,
+            @Nullable Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        this.objectIdentifier = objectIdentifier;
+        this.catalog = catalog;
+        this.resolvedTable = resolvedTable;
+    }
+
+    public boolean isAnonymous() {
+        return objectIdentifier == null;
+    }
+
+    /** @return true if the table is temporary. An anonymous table is always 
temporary. */
+    public boolean isTemporary() {
+        return catalog == null;
+    }
+
+    public boolean isPermanent() {
+        return !isTemporary();
+    }
+
+    /** Returns empty if {@link #isAnonymous()} is true. */
+    public Optional<ObjectIdentifier> getIdentifier() {
+        return Optional.ofNullable(objectIdentifier);
+    }
+
+    /** Returns empty if {@link #isPermanent()} is false. */
+    public Optional<Catalog> getCatalog() {
+        return Optional.ofNullable(catalog);
+    }
+
+    /** Returns a fully resolved catalog object. */
+    @SuppressWarnings("unchecked")
+    public <T extends ResolvedCatalogBaseTable<?>> T getResolvedTable() {
+        return (T) resolvedTable;
+    }
+
+    public ResolvedSchema getResolvedSchema() {
+        return resolvedTable.getResolvedSchema();
+    }
+
+    /** Returns the original metadata object returned by the catalog. */
+    @SuppressWarnings("unchecked")
+    public <T extends CatalogBaseTable> T getTable() {
+        return (T) resolvedTable.getOrigin();
+    }
+
+    /**
+     * Copy the {@link ContextResolvedTable}, replacing the underlying {@link 
CatalogTable} options.
+     */
+    public ContextResolvedTable copy(Map<String, String> newOptions) {
+        if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException("Cannot copy VIEW with new options.");

Review comment:
       `new ValidationException("The view '%s' cannot be enriched with new 
options.")`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
##########
@@ -340,14 +347,46 @@ private RelNode convertLegacyTableFunction(
         }
 
         @Override
-        public RelNode visit(CatalogQueryOperation catalogTable) {
-            ObjectIdentifier objectIdentifier = 
catalogTable.getTableIdentifier();
-            return relBuilder
-                    .scan(
-                            objectIdentifier.getCatalogName(),
-                            objectIdentifier.getDatabaseName(),
-                            objectIdentifier.getObjectName())
-                    .build();
+        public RelNode visit(SourceQueryOperation catalogTable) {
+            Optional<ObjectIdentifier> objectIdentifier =
+                    catalogTable.getContextResolvedTable().getIdentifier();
+            if (objectIdentifier.isPresent()) {

Review comment:
       can't we just create helper method that takes `ContextResolvedTable`? We 
don't need to perform another lookup.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java
##########
@@ -76,10 +78,23 @@ public StatementSet addInsert(String targetPath, Table 
table, boolean overwrite)
                 tableEnvironment.getParser().parseIdentifier(targetPath);
         ObjectIdentifier objectIdentifier =
                 
tableEnvironment.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+        ContextResolvedTable contextResolvedTable =
+                tableEnvironment
+                        .getCatalogManager()
+                        .getTable(objectIdentifier)
+                        .orElseThrow(
+                                () ->
+                                        new TableException(

Review comment:
       Instead of coming up with an exception at every location. Introduce a 
`CatalogManager.getTableOrError`.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
##########
@@ -56,7 +57,12 @@ public ResolvedSchema getResolvedSchema() {
     @Override
     public String asSummaryString() {
         Map<String, Object> args = new LinkedHashMap<>();
-        args.put("identifier", this.contextResolvedTable.toString());
+        args.put(
+                "identifier",
+                this.contextResolvedTable

Review comment:
       improve this

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
##########
@@ -74,15 +74,14 @@ public Table getTable(String tableName) {
                 .map(
                         lookupResult ->
                                 new CatalogSchemaTable(
-                                        identifier,
                                         lookupResult,
                                         getStatistic(lookupResult, identifier),
                                         isStreamingMode))
                 .orElse(null);
     }
 
     private FlinkStatistic getStatistic(
-            TableLookupResult lookupResult, ObjectIdentifier identifier) {
+            ContextResolvedTable lookupResult, ObjectIdentifier identifier) {

Review comment:
       rename variables in this class

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -287,27 +289,28 @@ public RexNode visitInputRef(RexInputRef inputRef) {
             if (!catalogOptional.isPresent()) {
                 throw new TableException(
                         String.format(
-                                "Table %s must from a catalog, but %s is not a 
catalog",
-                                identifier.asSummaryString(), 
identifier.getCatalogName()));
+                                "Table '%s' factory doesn't provide 
partitions, and it cannot be loaded from the catalog",

Review comment:
       `factory` -> `connector`
   
   It is nice that you would like to improve the exception. But please do this 
in a hotfix commit. It is useful on its own. Or collect all minor improvements 
in one umbrella commit.

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
##########
@@ -56,10 +56,10 @@ object TableSinkUtils {
     * @param partitionKeys The partition keys of this table.
     */
   def validateTableSink(
-      sinkOperation: CatalogSinkModifyOperation,
-      sinkIdentifier: ObjectIdentifier,
-      sink: TableSink[_],
-      partitionKeys: Seq[String]): Unit = {
+                         sinkOperation: SinkModifyOperation,

Review comment:
       fix indention

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExternalModifyOperation.java
##########
@@ -96,7 +79,6 @@ public ResolvedSchema getResolvedSchema() {
     @Override
     public String asSummaryString() {
         final Map<String, Object> args = new LinkedHashMap<>();
-        args.put("identifier", tableIdentifier);

Review comment:
       add a `asSummaryString` to `ContextResolvedTable` to make this summary 
string helpful again

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, 
its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and 
has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link 
CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and it's flagged as temporary.
+ *   <li>A anonymous/inline table: a table which is not stored in a catalog 
and doesn't have an
+ *       associated unique {@link ObjectIdentifier}.
+ * </ul>
+ *
+ * <p>The different handling of temporary and permanent tables is {@link 
Catalog} and {@link
+ * CatalogManager} instance specific, hence for these two kind of tables, an 
instance of this object
+ * represents the relationship between the specific {@link 
ResolvedCatalogBaseTable} instance and
+ * the specific {@link Catalog}/{@link CatalogManager} instances. For example, 
the same {@link
+ * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent 
for another one.
+ */
+@Internal
+public class ContextResolvedTable {
+
+    private final @Nullable ObjectIdentifier objectIdentifier;
+    private final @Nullable Catalog catalog;
+    private final ResolvedCatalogBaseTable<?> resolvedTable;
+
+    public static ContextResolvedTable permanent(
+            ObjectIdentifier identifier,
+            Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                identifier, Preconditions.checkNotNull(catalog), 
resolvedTable);
+    }
+
+    public static ContextResolvedTable temporary(
+            ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> 
resolvedTable) {
+        return new ContextResolvedTable(identifier, null, resolvedTable);

Review comment:
       add `Preconditions.checkNotNull`

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
##########
@@ -80,9 +83,21 @@
     @PublicEvolving
     interface Context {
 
-        /** Returns the identifier of the table in the {@link Catalog}. */
+        /**
+         * Returns the identifier of the table in the {@link Catalog}.
+         *
+         * @deprecated Because the table {@link ObjectIdentifier} could be 
null (e.g. for anonymous
+         *     tables), you should use {@link #getIdentifier()} instead.
+         */
+        @Deprecated
+        @Nullable
         ObjectIdentifier getObjectIdentifier();
 
+        /** Returns the identifier of the table in the {@link Catalog}, if 
any. */

Review comment:
       Add: `The identifier is empty for anonymous/inline tables defined in 
Table API.`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, 
its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and 
has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link 
CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and it's flagged as temporary.

Review comment:
       `it's` -> `is`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, 
its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and 
has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link 
CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and it's flagged as temporary.
+ *   <li>A anonymous/inline table: a table which is not stored in a catalog 
and doesn't have an

Review comment:
       `A` -> `An`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, 
its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and 
has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link 
CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and it's flagged as temporary.
+ *   <li>A anonymous/inline table: a table which is not stored in a catalog 
and doesn't have an
+ *       associated unique {@link ObjectIdentifier}.
+ * </ul>
+ *
+ * <p>The different handling of temporary and permanent tables is {@link 
Catalog} and {@link
+ * CatalogManager} instance specific, hence for these two kind of tables, an 
instance of this object
+ * represents the relationship between the specific {@link 
ResolvedCatalogBaseTable} instance and
+ * the specific {@link Catalog}/{@link CatalogManager} instances. For example, 
the same {@link
+ * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent 
for another one.
+ */
+@Internal
+public class ContextResolvedTable {
+
+    private final @Nullable ObjectIdentifier objectIdentifier;
+    private final @Nullable Catalog catalog;
+    private final ResolvedCatalogBaseTable<?> resolvedTable;
+
+    public static ContextResolvedTable permanent(
+            ObjectIdentifier identifier,
+            Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                identifier, Preconditions.checkNotNull(catalog), 
resolvedTable);
+    }
+
+    public static ContextResolvedTable temporary(
+            ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> 
resolvedTable) {
+        return new ContextResolvedTable(identifier, null, resolvedTable);
+    }
+
+    public static ContextResolvedTable anonymous(ResolvedCatalogBaseTable<?> 
resolvedTable) {
+        return new ContextResolvedTable(null, null, resolvedTable);
+    }
+
+    private ContextResolvedTable(
+            @Nullable ObjectIdentifier objectIdentifier,
+            @Nullable Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        this.objectIdentifier = objectIdentifier;
+        this.catalog = catalog;
+        this.resolvedTable = resolvedTable;
+    }
+
+    public boolean isAnonymous() {
+        return objectIdentifier == null;
+    }
+
+    /** @return true if the table is temporary. An anonymous table is always 
temporary. */
+    public boolean isTemporary() {
+        return catalog == null;
+    }
+
+    public boolean isPermanent() {
+        return !isTemporary();
+    }
+
+    /** Returns empty if {@link #isAnonymous()} is true. */
+    public Optional<ObjectIdentifier> getIdentifier() {
+        return Optional.ofNullable(objectIdentifier);
+    }
+
+    /** Returns empty if {@link #isPermanent()} is false. */
+    public Optional<Catalog> getCatalog() {
+        return Optional.ofNullable(catalog);
+    }
+
+    /** Returns a fully resolved catalog object. */
+    @SuppressWarnings("unchecked")
+    public <T extends ResolvedCatalogBaseTable<?>> T getResolvedTable() {
+        return (T) resolvedTable;
+    }
+
+    public ResolvedSchema getResolvedSchema() {
+        return resolvedTable.getResolvedSchema();
+    }
+
+    /** Returns the original metadata object returned by the catalog. */
+    @SuppressWarnings("unchecked")
+    public <T extends CatalogBaseTable> T getTable() {
+        return (T) resolvedTable.getOrigin();
+    }
+
+    /**
+     * Copy the {@link ContextResolvedTable}, replacing the underlying {@link 
CatalogTable} options.
+     */
+    public ContextResolvedTable copy(Map<String, String> newOptions) {
+        if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException("Cannot copy VIEW with new options.");
+        }
+        return new ContextResolvedTable(
+                objectIdentifier, catalog, ((ResolvedCatalogTable) 
resolvedTable).copy(newOptions));
+    }
+
+    @Override
+    public String toString() {

Review comment:
       let's skip this method and the method below in this commit, I see you 
added more logic in the next one.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
##########
@@ -19,40 +19,45 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-/** Describes a relational operation that was created from a lookup to a 
catalog. */
+/**
+ * Describes a query operation from a {@link ContextResolvedTable}.
+ *
+ * <p>The source table is described by {@link #getContextResolvedTable()}, and 
in general is used
+ * for every source which implementation is defined with {@link 
DynamicTableSource}. {@code
+ * DataStream} sources are handled by {@code DataStreamQueryOperation}.

Review comment:
       `DataStreamQueryOperation` -> `ExternalQueryOperation`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
##########
@@ -19,40 +19,45 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-/** Describes a relational operation that was created from a lookup to a 
catalog. */
+/**
+ * Describes a query operation from a {@link ContextResolvedTable}.
+ *
+ * <p>The source table is described by {@link #getContextResolvedTable()}, and 
in general is used
+ * for every source which implementation is defined with {@link 
DynamicTableSource}. {@code
+ * DataStream} sources are handled by {@code DataStreamQueryOperation}.
+ */
 @Internal
-public class CatalogQueryOperation implements QueryOperation {
+public class SourceQueryOperation implements QueryOperation {
 
-    private final ObjectIdentifier tableIdentifier;
-    private final ResolvedSchema resolvedSchema;
+    private final ContextResolvedTable contextResolvedTable;
 
-    public CatalogQueryOperation(ObjectIdentifier tableIdentifier, 
ResolvedSchema resolvedSchema) {
-        this.tableIdentifier = tableIdentifier;
-        this.resolvedSchema = resolvedSchema;
+    public SourceQueryOperation(ContextResolvedTable contextResolvedTable) {
+        this.contextResolvedTable = contextResolvedTable;
     }
 
-    public ObjectIdentifier getTableIdentifier() {
-        return tableIdentifier;
+    public ContextResolvedTable getContextResolvedTable() {
+        return contextResolvedTable;
     }
 
     @Override
     public ResolvedSchema getResolvedSchema() {
-        return resolvedSchema;
+        return this.contextResolvedTable.getResolvedSchema();

Review comment:
       nit: remove `this.`

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -160,7 +160,9 @@ public static DynamicTableSource createDynamicTableSource(
                             "Unable to create a source for reading table 
'%s'.\n\n"
                                     + "Table options are:\n\n"
                                     + "%s",
-                            objectIdentifier.asSummaryString(),
+                            objectIdentifier != null

Review comment:
       Improve error message to `Unable to create a source for reading the 
table.`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
##########
@@ -19,40 +19,45 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-/** Describes a relational operation that was created from a lookup to a 
catalog. */
+/**
+ * Describes a query operation from a {@link ContextResolvedTable}.
+ *
+ * <p>The source table is described by {@link #getContextResolvedTable()}, and 
in general is used
+ * for every source which implementation is defined with {@link 
DynamicTableSource}. {@code
+ * DataStream} sources are handled by {@code DataStreamQueryOperation}.
+ */
 @Internal
-public class CatalogQueryOperation implements QueryOperation {
+public class SourceQueryOperation implements QueryOperation {
 
-    private final ObjectIdentifier tableIdentifier;
-    private final ResolvedSchema resolvedSchema;
+    private final ContextResolvedTable contextResolvedTable;
 
-    public CatalogQueryOperation(ObjectIdentifier tableIdentifier, 
ResolvedSchema resolvedSchema) {
-        this.tableIdentifier = tableIdentifier;
-        this.resolvedSchema = resolvedSchema;
+    public SourceQueryOperation(ContextResolvedTable contextResolvedTable) {
+        this.contextResolvedTable = contextResolvedTable;
     }
 
-    public ObjectIdentifier getTableIdentifier() {
-        return tableIdentifier;
+    public ContextResolvedTable getContextResolvedTable() {
+        return contextResolvedTable;
     }
 
     @Override
     public ResolvedSchema getResolvedSchema() {
-        return resolvedSchema;
+        return this.contextResolvedTable.getResolvedSchema();
     }
 
     @Override
     public String asSummaryString() {
         Map<String, Object> args = new LinkedHashMap<>();
-        args.put("identifier", tableIdentifier);
-        args.put("fields", resolvedSchema.getColumnNames());
+        args.put("identifier", this.contextResolvedTable.toString());

Review comment:
       `identifier` -> `table`

##########
File path: 
flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/ExternalQueryOperation.java
##########
@@ -93,12 +82,12 @@ public ChangelogMode getChangelogMode() {
     @Override
     public String asSummaryString() {
         final Map<String, Object> args = new LinkedHashMap<>();
-        args.put("identifier", identifier);
+        args.put("identifier", contextResolvedTable);

Review comment:
       `identifier` -> `table`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
##########
@@ -1375,10 +1375,9 @@ default Table limit(int offset, int fetch) {
      * table (backed by a {@link DynamicTableSink}) expressed via the given 
{@link TableDescriptor}.
      * It executes the insert operation.
      *
-     * <p>The {@link TableDescriptor descriptor} is registered as an inline 
(i.e. anonymous)
-     * temporary catalog table (see {@link 
TableEnvironment#createTemporaryTable(String,
-     * TableDescriptor)}) using a unique identifier. Note that calling this 
method multiple times,
-     * even with the same descriptor, results in multiple sink tables being 
registered.
+     * <p>The {@link TableDescriptor descriptor} won't be registered in the 
catalog, but it will be
+     * propagated directly in the operation tree. Note that calling this 
method multiple times, even
+     * with the same descriptor, results in multiple sink tables being 
registered.

Review comment:
       `registered` is not the right work here anymore

##########
File path: 
flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
##########
@@ -202,17 +207,12 @@ public static Executor lookupExecutor(
         final ResolvedSchema resolvedSchema =
                 schemaResolver.resolve(schemaTranslationResult.getSchema());
 
-        final UnresolvedIdentifier unresolvedIdentifier =
-                UnresolvedIdentifier.of(
-                        "Unregistered_DataStream_Sink_" + 
ExternalModifyOperation.getUniqueId());
-        final ObjectIdentifier objectIdentifier =
-                catalogManager.qualifyIdentifier(unresolvedIdentifier);
-
         final ExternalModifyOperation modifyOperation =
                 new ExternalModifyOperation(
-                        objectIdentifier,
+                        ContextResolvedTable.anonymous(
+                                new ResolvedCatalogTable(

Review comment:
       Same comment as above. call `CatalogManager.resolveCatalogTable` 
directly.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java
##########
@@ -62,10 +62,10 @@
      * written to a table (backed by a {@link DynamicTableSink}) expressed via 
the given {@link
      * TableDescriptor}.
      *
-     * <p>The given {@link TableDescriptor descriptor} is registered as an 
inline (i.e. anonymous)
-     * temporary catalog table (see {@link 
TableEnvironment#createTemporaryTable(String,
-     * TableDescriptor)}. Then a statement is added to the statement set that 
inserts the {@link
-     * Table} object's pipeline into that temporary table.
+     * <p>The given {@link TableDescriptor descriptor} won't be registered in 
the catalog, but it
+     * will be propagated directly in the operation tree, adding a statement 
to the statement set

Review comment:
       Very long sentence that mentions descriptor two times. Break it up into 
two sentences?

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1161,7 +1160,7 @@ public TableResultInternal executeInternal(Operation 
operation) {
         } else if (operation instanceof ShowCreateTableOperation) {
             ShowCreateTableOperation showCreateTableOperation =
                     (ShowCreateTableOperation) operation;
-            Optional<CatalogManager.TableLookupResult> result =
+            Optional<ContextResolvedTable> result =

Review comment:
       nit: maybe rename the variable here and below?

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1513,10 +1512,9 @@ private TableResultInternal buildResult(String[] 
headers, DataType[] types, Obje
         List<String> tableNames = new ArrayList<>(operations.size());
         Map<String, Integer> tableNameToCount = new HashMap<>();
         for (ModifyOperation operation : operations) {
-            if (operation instanceof CatalogSinkModifyOperation) {
-                ObjectIdentifier identifier =
-                        ((CatalogSinkModifyOperation) 
operation).getTableIdentifier();
-                String fullName = identifier.asSummaryString();
+            if (operation instanceof SinkModifyOperation) {
+                String fullName =
+                        ((SinkModifyOperation) 
operation).getContextResolvedTable().toString();

Review comment:
       again, we should not use `toString` but dedicated methods that indicate 
the importance like `asSummaryString`. There are too many strings floating 
around in the stack.

##########
File path: 
flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
##########
@@ -138,31 +141,33 @@ public static Executor lookupExecutor(
         final SchemaResolver schemaResolver = 
catalogManager.getSchemaResolver();
         final OperationTreeBuilder operationTreeBuilder = 
getOperationTreeBuilder();
 
-        final UnresolvedIdentifier unresolvedIdentifier;
-        if (viewPath != null) {
-            unresolvedIdentifier = getParser().parseIdentifier(viewPath);
-        } else {
-            unresolvedIdentifier =
-                    UnresolvedIdentifier.of("Unregistered_DataStream_Source_" 
+ dataStream.getId());
-        }
-        final ObjectIdentifier objectIdentifier =
-                catalogManager.qualifyIdentifier(unresolvedIdentifier);
-
         final SchemaTranslator.ConsumingResult schemaTranslationResult =
                 SchemaTranslator.createConsumingResult(
                         catalogManager.getDataTypeFactory(), 
dataStream.getType(), schema);
 
         final ResolvedSchema resolvedSchema =
                 schemaTranslationResult.getSchema().resolve(schemaResolver);
+        final ResolvedCatalogTable resolvedCatalogTable =
+                new ResolvedCatalogTable(new 
ExternalCatalogTable(resolvedSchema), resolvedSchema);

Review comment:
       `ExternalCatalogTable` could also take the unresolved schema directly 
and we just pass the whole thing to `CatalogManager.resolveCatalogTable` no 
special logic required.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CollectModifyOperation.java
##########
@@ -48,19 +45,14 @@
     // QueryOperation.getResolvedSchema()
     private DataType consumedDataType;
 
-    public CollectModifyOperation(ObjectIdentifier tableIdentifier, 
QueryOperation child) {
-        this.tableIdentifier = tableIdentifier;
+    public CollectModifyOperation(QueryOperation child) {
         this.child = child;
     }
 
     public static int getUniqueId() {

Review comment:
       can this be removed as well?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
##########
@@ -340,14 +347,46 @@ private RelNode convertLegacyTableFunction(
         }
 
         @Override
-        public RelNode visit(CatalogQueryOperation catalogTable) {
-            ObjectIdentifier objectIdentifier = 
catalogTable.getTableIdentifier();
-            return relBuilder
-                    .scan(
-                            objectIdentifier.getCatalogName(),
-                            objectIdentifier.getDatabaseName(),
-                            objectIdentifier.getObjectName())
-                    .build();
+        public RelNode visit(SourceQueryOperation catalogTable) {
+            Optional<ObjectIdentifier> objectIdentifier =
+                    catalogTable.getContextResolvedTable().getIdentifier();
+            if (objectIdentifier.isPresent()) {
+                return relBuilder
+                        .scan(
+                                objectIdentifier.get().getCatalogName(),
+                                objectIdentifier.get().getDatabaseName(),
+                                objectIdentifier.get().getObjectName())
+                        .build();
+            }
+            return createAnonymousLogicalTableScan(catalogTable);
+        }
+
+        /** This manually creates the logical table scan to skip the calcite 
catalog resolution. */
+        private RelNode createAnonymousLogicalTableScan(SourceQueryOperation 
catalogTable) {
+            // Statistics are unknown for anonymous tables
+            // Look at DatabaseCalciteSchema#getStatistic for more details
+            FlinkStatistic flinkStatistic =

Review comment:
       nit: maybe move all this code to 
`CatalogSourceTable.createAnonymous(ContextResolvedTable, boolean isBatchMode)`

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
##########
@@ -202,8 +202,8 @@ public CatalogSinkModifyOperation createInsertOperation(
         UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(targetTablePath);
         ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
 
-        return new CatalogSinkModifyOperation(
-                identifier,
+        return new SinkModifyOperation(
+                catalogManager.getTable(identifier).get(),

Review comment:
       use the previously mentioned `getTableOrError` everywhere, it should 
fail but if it does we have a nice exception

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
##########
@@ -137,10 +142,12 @@
     private final AggregateVisitor aggregateVisitor = new AggregateVisitor();
     private final TableAggregateVisitor tableAggregateVisitor = new 
TableAggregateVisitor();
     private final JoinExpressionVisitor joinExpressionVisitor = new 
JoinExpressionVisitor();
+    private final boolean isStreamingMode;
 
-    public QueryOperationConverter(FlinkRelBuilder relBuilder) {
+    public QueryOperationConverter(FlinkRelBuilder relBuilder, boolean 
isStreamingMode) {

Review comment:
       nit: do `isBatchMode` because batch is a special case of streaming, this 
is kind of inconsistent in the code base right now

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
##########
@@ -340,14 +347,46 @@ private RelNode convertLegacyTableFunction(
         }
 
         @Override
-        public RelNode visit(CatalogQueryOperation catalogTable) {
-            ObjectIdentifier objectIdentifier = 
catalogTable.getTableIdentifier();
-            return relBuilder
-                    .scan(
-                            objectIdentifier.getCatalogName(),
-                            objectIdentifier.getDatabaseName(),
-                            objectIdentifier.getObjectName())
-                    .build();
+        public RelNode visit(SourceQueryOperation catalogTable) {
+            Optional<ObjectIdentifier> objectIdentifier =
+                    catalogTable.getContextResolvedTable().getIdentifier();
+            if (objectIdentifier.isPresent()) {
+                return relBuilder
+                        .scan(
+                                objectIdentifier.get().getCatalogName(),
+                                objectIdentifier.get().getDatabaseName(),
+                                objectIdentifier.get().getObjectName())
+                        .build();
+            }
+            return createAnonymousLogicalTableScan(catalogTable);
+        }
+
+        /** This manually creates the logical table scan to skip the calcite 
catalog resolution. */
+        private RelNode createAnonymousLogicalTableScan(SourceQueryOperation 
catalogTable) {

Review comment:
       give parameter a better name

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
##########
@@ -593,8 +606,20 @@ public TableResult executeInsert(TableDescriptor 
descriptor, boolean overwrite)
         final TableDescriptor updatedDescriptor =
                 
descriptor.toBuilder().schema(schemaTranslationResult.getSchema()).build();
 
-        tableEnvironment.createTemporaryTable(path, updatedDescriptor);
-        return executeInsert(path, overwrite);
+        final ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                tableEnvironment
+                        .getCatalogManager()
+                        
.resolveCatalogBaseTable(updatedDescriptor.toCatalogTable());
+
+        ModifyOperation operation =
+                new SinkModifyOperation(
+                        
ContextResolvedTable.anonymous(resolvedCatalogBaseTable),
+                        getQueryOperation(),
+                        Collections.emptyMap(),
+                        overwrite,
+                        Collections.emptyMap());
+
+        return 
tableEnvironment.executeInternal(Collections.singletonList(operation));

Review comment:
       reduce code duplication a bit by having an private `executeInsert` that 
takes `(ContextResolvedTable, boolean overwrite)`

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -210,7 +210,7 @@ public DynamicTableSource createDynamicTableSource(Context 
context) {
                 startupOptions.startupMode,
                 startupOptions.specificOffsets,
                 startupOptions.startupTimestampMillis,
-                
context.getIdentifier().map(ObjectIdentifier::toString).orElse("anonymous"));
+                
context.getIdentifier().map(ObjectIdentifier::asSummaryString).orElse("anonymous"));

Review comment:
       let's introduce another context identifier

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
##########
@@ -80,28 +79,28 @@ public static RelNode convertDataStreamToRel(
             boolean isBatchMode,
             ReadableConfig config,
             FlinkRelBuilder relBuilder,
-            ObjectIdentifier identifier,
-            ResolvedSchema schema,
+            ContextResolvedTable contextResolvedTable,
             DataStream<?> dataStream,
             DataType physicalDataType,
             boolean isTopLevelRecord,
             ChangelogMode changelogMode) {
-        final CatalogTable unresolvedTable = new InlineCatalogTable(schema);
-        final ResolvedCatalogTable catalogTable = new 
ResolvedCatalogTable(unresolvedTable, schema);
         final DynamicTableSource tableSource =
                 new ExternalDynamicSource<>(
-                        identifier, dataStream, physicalDataType, 
isTopLevelRecord, changelogMode);
+                        dataStream, physicalDataType, isTopLevelRecord, 
changelogMode);
         final FlinkStatistic statistic =
                 FlinkStatistic.builder()
                         // this is a temporary solution, FLINK-15123 will 
resolve this

Review comment:
       this exists too many times. create a method 
`FlinkStatistic.create(ResolvedSchema)` or so.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSource.java
##########
@@ -126,11 +121,8 @@ private String generateOperatorName() {
 
     private String generateOperatorDesc() {
         return String.format(
-                "DataSteamToTable(stream=%s, type=%s, rowtime=%s, 
watermark=%s)",
-                identifier.asSummaryString(),
-                physicalDataType.toString(),
-                produceRowtimeMetadata,
-                propagateWatermark);
+                "DataSteamToTable(type=%s, rowtime=%s, watermark=%s)",

Review comment:
       readd an identifier in the summary

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
##########
@@ -215,7 +216,7 @@ public void testTableSource() {
                         StartupMode.SPECIFIC_OFFSETS,
                         specificOffsets,
                         0);
-        assertEquals(actualKafkaSource, expectedKafkaSource);
+        
Assertions.assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);

Review comment:
       static import

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableASOperation.java
##########
@@ -26,25 +26,32 @@
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.function.Supplier;
 
 /** Operation to describe a CREATE TABLE AS statement. */
 @Internal
 public class CreateTableASOperation implements CreateOperation {
 
     private final CreateTableOperation createTableOperation;
-    private final SinkModifyOperation insertOperation;
+    private final Supplier<SinkModifyOperation> insertOperationFactory;
+
+    private SinkModifyOperation insertOperation;

Review comment:
       no mutability in operations, pass the resolved table to 
`getInsertOperation(...)` instead.

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -18,52 +18,52 @@
 
 package org.apache.flink.table.planner.plan.schema
 
-import org.apache.flink.table.catalog.{ObjectIdentifier, ResolvedCatalogTable}
+import org.apache.flink.table.catalog.ContextResolvedTable
 import org.apache.flink.table.connector.source.DynamicTableSource
 import org.apache.flink.table.planner.calcite.FlinkContext
 import org.apache.flink.table.planner.connectors.DynamicSourceUtils
 import 
org.apache.flink.table.planner.plan.abilities.source.{SourceAbilityContext, 
SourceAbilitySpec}
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.RelOptSchema
 import org.apache.calcite.rel.`type`.RelDataType
 
 import java.util
+import java.util.Collections
 
 /**
  * A [[FlinkPreparingTableBase]] implementation which defines the context 
variables
  * required to translate the Calcite [[org.apache.calcite.plan.RelOptTable]] 
to the Flink specific
  * relational expression with [[DynamicTableSource]].
  *
  * @param relOptSchema The RelOptSchema that this table comes from
- * @param tableIdentifier The full path of the table to retrieve.
  * @param rowType The table row type
  * @param statistic The table statistics
  * @param tableSource The [[DynamicTableSource]] for which is converted to a 
Calcite Table
  * @param isStreamingMode A flag that tells if the current table is in stream 
mode
- * @param catalogTable Resolved catalog table where this table source table 
comes from
+ * @param contextResolvedTable Resolved catalog table where this table source 
table comes from
  * @param flinkContext The flink context which is used to generate extra 
digests based on
  *                     abilitySpecs
  * @param abilitySpecs The abilitySpecs applied to the source
  */
 class TableSourceTable(
     relOptSchema: RelOptSchema,
-    val tableIdentifier: ObjectIdentifier,
     rowType: RelDataType,
     statistic: FlinkStatistic,
     val tableSource: DynamicTableSource,
     val isStreamingMode: Boolean,
-    val catalogTable: ResolvedCatalogTable,
+    val contextResolvedTable: ContextResolvedTable,
     val flinkContext: FlinkContext,
     val abilitySpecs: Array[SourceAbilitySpec] = Array.empty)
   extends FlinkPreparingTableBase(
     relOptSchema,
     rowType,
-    util.Arrays.asList(
-      tableIdentifier.getCatalogName,
-      tableIdentifier.getDatabaseName,
-      tableIdentifier.getObjectName),
+    toScala(contextResolvedTable.getIdentifier).map(id => util.Arrays.asList(

Review comment:
       improve formatting here or introduce a helper method

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java
##########
@@ -114,24 +110,31 @@ private ResolvedCatalogTable createFinalCatalogTable(
                             FlinkHints.HINT_NAME_OPTIONS,
                             
TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED.key()));
         }
-        return catalogTable.copy(
-                FlinkHints.mergeTableOptions(hintedOptions, 
catalogTable.getOptions()));
+        return schemaTable
+                .getContextResolvedTable()

Review comment:
       nit: store context resolved table in a local variable to improve code 
readability

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java
##########
@@ -76,10 +78,23 @@ public StatementSet addInsert(String targetPath, Table 
table, boolean overwrite)
                 tableEnvironment.getParser().parseIdentifier(targetPath);
         ObjectIdentifier objectIdentifier =
                 
tableEnvironment.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+        ContextResolvedTable contextResolvedTable =
+                tableEnvironment
+                        .getCatalogManager()
+                        .getTable(objectIdentifier)
+                        .orElseThrow(
+                                () ->
+                                        new TableException(

Review comment:
       nit: introduce a local variable when referencing catalog manager 
multiple times

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
##########
@@ -340,14 +347,46 @@ private RelNode convertLegacyTableFunction(
         }
 
         @Override
-        public RelNode visit(CatalogQueryOperation catalogTable) {
-            ObjectIdentifier objectIdentifier = 
catalogTable.getTableIdentifier();
-            return relBuilder
-                    .scan(
-                            objectIdentifier.getCatalogName(),
-                            objectIdentifier.getDatabaseName(),
-                            objectIdentifier.getObjectName())
-                    .build();
+        public RelNode visit(SourceQueryOperation catalogTable) {
+            Optional<ObjectIdentifier> objectIdentifier =
+                    catalogTable.getContextResolvedTable().getIdentifier();
+            if (objectIdentifier.isPresent()) {

Review comment:
       or do we need to do this for the statistics?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to