twalthr commented on a change in pull request #18427:
URL: https://github.com/apache/flink/pull/18427#discussion_r790700555



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
##########
@@ -80,7 +80,25 @@
     @PublicEvolving
     interface Context {
 
-        /** Returns the identifier of the table in the {@link Catalog}. */
+        /**
+         * Returns the identifier of the table in the {@link Catalog}.
+         *
+         * <p>This identifier defines the relationship between the table 
instance and the associated
+         * {@link Catalog} (if any), but it doesn't uniquely identify this 
specific table
+         * setup/instance across a table program, as the same table might be 
stored in different
+         * catalogs or, in case of anonymous tables, this identifier is 
auto-generated
+         * non-deterministic. Because of that behaviour, We strongly suggest 
using this identifier

Review comment:
       `behaviour, We`
   
   Similar comment as before: Try to split long sentences. It is very uncommon 
in English to have long nested sentences:
   
   ```
   This identifier defines the relationship between the table instance and the 
associated {@link Catalog} (if any). However, it doesn't uniquely identify this 
specific table setup/instance across a table program. The same table might be 
stored in different catalogs or, in case of anonymous tables, this identifier 
is auto-generated and non-deterministic.
   ```

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/RexNodeExpression.java
##########
@@ -108,4 +109,23 @@ public DataType getOutputDataType() {
     public String toString() {
         return asSummaryString();
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RexNodeExpression that = (RexNodeExpression) o;
+        return Objects.equals(rexNode, that.rexNode)

Review comment:
       nit: `rexNode` and `outputDataType` cannot be null and could use 
`equals` directly 

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/AggregatePushDownSpec.java
##########
@@ -204,4 +205,26 @@ public static boolean apply(
         }
         return aggExpressions;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {

Review comment:
       mark the class as `final` and other classes of this commit as `final`?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
##########
@@ -80,7 +80,25 @@
     @PublicEvolving
     interface Context {
 
-        /** Returns the identifier of the table in the {@link Catalog}. */
+        /**
+         * Returns the identifier of the table in the {@link Catalog}.
+         *
+         * <p>This identifier defines the relationship between the table 
instance and the associated
+         * {@link Catalog} (if any), but it doesn't uniquely identify this 
specific table
+         * setup/instance across a table program, as the same table might be 
stored in different
+         * catalogs or, in case of anonymous tables, this identifier is 
auto-generated
+         * non-deterministic. Because of that behaviour, We strongly suggest 
using this identifier
+         * only for debugging purpose, and rely on user input for uniquely 
identifying a "table
+         * instance".
+         *
+         * <p>For example, when implementing a Kafka source using consumer 
groups, the user should
+         * provide the consumer group id manually rather than using this 
identifier as the consumer
+         * group id, so the offset tracking remains stable even if this table 
is anonymous, or it's
+         * moved to another {@link Catalog}.
+         *
+         * <p>Note that for anonymous tables {@link 
ObjectIdentifier#asSerializableString()} will
+         * fail, so we suggest to use {@link 
ObjectIdentifier#asSummaryString()} for debugging.

Review comment:
       `debugging` -> `printing and logging`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.catalog.Column;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.serializeOptionalField;
+
+class ColumnJsonSerializer extends StdSerializer<Column> {
+
+    public static final String COLUMN_TYPE = "type";
+    public static final String COLUMN_TYPE_PHYSICAL = "physical";
+    public static final String COLUMN_TYPE_COMPUTED = "computed";
+    public static final String COLUMN_TYPE_METADATA = "metadata";
+    public static final String NAME = "name";
+    public static final String DATA_TYPE = "dataType";
+    public static final String COMMENT = "comment";
+    public static final String EXPRESSION = "expression";
+    public static final String METADATA_KEY = "metadataKey";
+    public static final String IS_VIRTUAL = "isVirtual";
+
+    public ColumnJsonSerializer() {
+        super(Column.class);
+    }
+
+    @Override
+    public void serialize(
+            Column column, JsonGenerator jsonGenerator, SerializerProvider 
serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStartObject();
+
+        // Common fields
+        jsonGenerator.writeStringField(NAME, column.getName());
+        serializeOptionalField(jsonGenerator, COMMENT, column.getComment(), 
serializerProvider);
+
+        if (column instanceof Column.PhysicalColumn) {
+            serialize((Column.PhysicalColumn) column, jsonGenerator, 
serializerProvider);
+        } else if (column instanceof Column.MetadataColumn) {
+            serialize((Column.MetadataColumn) column, jsonGenerator, 
serializerProvider);
+        } else {

Review comment:
       use `else if` here

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.ObjectCodec;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE_COMPUTED;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE_METADATA;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE_PHYSICAL;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COMMENT;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.DATA_TYPE;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.EXPRESSION;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.IS_VIRTUAL;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.METADATA_KEY;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.NAME;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.deserializeOptionalField;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.traverse;
+
+class ColumnJsonDeserializer extends StdDeserializer<Column> {
+
+    private static final String[] SUPPORTED_COLUMN_TYPES =

Review comment:
       nit: I would recommend `KIND` instead of `TYPE`, it makes discussions 
easier.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
##########
@@ -36,32 +37,56 @@
 import javax.annotation.Nullable;
 
 import java.util.List;
+import java.util.Objects;
 
 /**
  * {@link DynamicTableSourceSpec} describes how to serialize/deserialize 
dynamic table sink table
  * and create {@link DynamicTableSink} from the deserialization result.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonInclude(JsonInclude.Include.NON_EMPTY)
-public class DynamicTableSinkSpec extends CatalogTableSpecBase {
+public class DynamicTableSinkSpec {
 
+    public static final String FIELD_NAME_CATALOG_TABLE_SPEC = "catalogTable";

Review comment:
       `FIELD_NAME_CATALOG_TABLE` because it is not a spec

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ExternalCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * This serializer can be configured via an attribute to serialize or not the 
options and comments,
+ * setting the attribute {@link #SERIALIZE_OPTIONS} to {@code true} or {@code 
false}.
+ */
+class ResolvedCatalogTableJsonSerializer extends 
StdSerializer<ResolvedCatalogTable> {
+    private static final long serialVersionUID = 1L;
+
+    static final String SERIALIZE_OPTIONS = "serialize_options";
+
+    public static final String RESOLVED_SCHEMA = "resolvedSchema";
+    public static final String PARTITION_KEYS = "partitionKeys";
+    public static final String OPTIONS = "options";
+    public static final String COMMENT = "comment";
+
+    public ResolvedCatalogTableJsonSerializer() {
+        super(ResolvedCatalogTable.class);
+    }
+
+    @Override
+    public void serialize(
+            ResolvedCatalogTable resolvedCatalogTable,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        // Thia should never happen anyway, but we keep this assertion for 
sanity check
+        assert resolvedCatalogTable.getTableKind() == 
CatalogBaseTable.TableKind.TABLE;
+
+        boolean serializeOptions =

Review comment:
       I find it confusing to have a second way of reading configuration. Why 
not using the `SerdeContext` instead? When people search for usages of the 
table option, they will not find the usages of a second config stack.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
##########
@@ -1,76 +1,110 @@
 {
-   "flinkVersion":"",
-   "nodes":[
-      {
-         
"class":"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
-         "scanTableSource":{
-            "identifier":{
-               "catalogName":"default_catalog",
-               "databaseName":"default_database",
-               "tableName":"MyTable"
+  "flinkVersion": "",
+  "nodes": [
+    {
+      "class": 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+      "scanTableSource": {
+        "catalogTable": {
+          "identifier": "`default_catalog`.`default_database`.`MyTable`",
+          "catalogTable": {
+            "resolvedSchema": {
+              "columns": [
+                {
+                  "name": "a",
+                  "type": "physical",
+                  "dataType": "BIGINT"
+                },
+                {
+                  "name": "b",
+                  "type": "physical",

Review comment:
       let's still try to comply with 
`CatalogPropertiesUtil#serializeCatalogTable`. E.g. we can omit `"type": 
"physical"`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolvedTableJsonSerializer.FIELD_NAME_CATALOG_TABLE;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolvedTableJsonSerializer.FIELD_NAME_IDENTIFIER;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatalogTableJsonSerializer.OPTIONS;
+
+class ContextResolvedTableJsonDeserializer extends 
StdDeserializer<ContextResolvedTable> {
+    private static final long serialVersionUID = 1L;
+
+    public ContextResolvedTableJsonDeserializer() {
+        super(ContextResolvedTable.class);
+    }
+
+    @Override
+    public ContextResolvedTable deserialize(JsonParser jsonParser, 
DeserializationContext ctx)
+            throws IOException {
+        final CatalogPlanRestore planRestoreOption =
+                SerdeContext.get(ctx)
+                        .getConfiguration()
+                        .get(TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS);
+        final CatalogManager catalogManager =
+                SerdeContext.get(ctx).getFlinkContext().getCatalogManager();
+        final ObjectNode objectNode = jsonParser.readValueAsTree();
+
+        // Deserialize the two fields, if available
+        final ObjectIdentifier identifier =
+                JsonSerdeUtil.deserializeOptionalField(
+                                objectNode,
+                                FIELD_NAME_IDENTIFIER,
+                                ObjectIdentifier.class,
+                                jsonParser.getCodec(),
+                                ctx)
+                        .orElse(null);
+        ResolvedCatalogTable resolvedCatalogTable =
+                JsonSerdeUtil.deserializeOptionalField(
+                                objectNode,
+                                FIELD_NAME_CATALOG_TABLE,
+                                ResolvedCatalogTable.class,
+                                jsonParser.getCodec(),
+                                ctx)
+                        .orElse(null);
+
+        if (identifier == null && resolvedCatalogTable == null) {
+            throw new ValidationException(
+                    String.format(
+                            "The input json is invalid because it doesn't 
contain '%s', nor the '%s'.",
+                            FIELD_NAME_IDENTIFIER, FIELD_NAME_CATALOG_TABLE));
+        }
+
+        if (identifier == null) {
+            if (isLookupForced(planRestoreOption)) {
+                throw missingIdentifier();
+            }
+            return ContextResolvedTable.anonymous(resolvedCatalogTable);
+        }
+
+        Optional<ContextResolvedTable> contextResolvedTableFromCatalog =
+                isLookupEnabled(planRestoreOption)
+                        ? catalogManager.getTable(identifier)
+                        : Optional.empty();
+
+        // If we have a schema from the plan and from the catalog, we need to 
check they match.
+        if (contextResolvedTableFromCatalog.isPresent() && 
resolvedCatalogTable != null) {
+            ResolvedSchema schemaFromPlan = 
resolvedCatalogTable.getResolvedSchema();
+            ResolvedSchema schemaFromCatalog =
+                    contextResolvedTableFromCatalog.get().getResolvedSchema();
+            if (!areResolvedSchemasEqual(schemaFromPlan, schemaFromCatalog)) {
+                throw schemaNotMatching(identifier, schemaFromPlan, 
schemaFromCatalog);
+            }
+        }
+
+        if (resolvedCatalogTable == null || isLookupForced(planRestoreOption)) 
{
+            if (!isLookupEnabled(planRestoreOption)) {
+                throw lookupDisabled(identifier);
+            }
+            // We use what is stored inside the catalog
+            return contextResolvedTableFromCatalog.orElseThrow(
+                    () -> missingTableFromCatalog(identifier));
+        }
+
+        if (contextResolvedTableFromCatalog.isPresent()) {
+            // If no config map is present, then the ContextResolvedTable was 
serialized with
+            // SCHEMA, so we just need to return the catalog query result
+            if (objectNode.at("/" + FIELD_NAME_CATALOG_TABLE + "/" + 
OPTIONS).isMissingNode()) {
+                return contextResolvedTableFromCatalog.get();
+            }
+
+            return contextResolvedTableFromCatalog
+                    .flatMap(ContextResolvedTable::getCatalog)
+                    .map(c -> ContextResolvedTable.permanent(identifier, c, 
resolvedCatalogTable))
+                    .orElseGet(
+                            () -> ContextResolvedTable.temporary(identifier, 
resolvedCatalogTable));
+        }
+
+        return ContextResolvedTable.temporary(identifier, 
resolvedCatalogTable);
+    }
+
+    private boolean areResolvedSchemasEqual(
+            ResolvedSchema schemaFromPlan, ResolvedSchema schemaFromCatalog) {
+        // For schema equality we check:
+        //  * Columns size and order
+        //  * For each column: name, kind (class) and type
+        //  * Check partition keys set equality
+        @SuppressWarnings("rawtypes")
+        List<Tuple3<String, Class, DataType>> columnsFromPlan =
+                schemaFromPlan.getColumns().stream()
+                        .map(c -> Tuple3.of(c.getName(), (Class) c.getClass(), 
c.getDataType()))
+                        .collect(Collectors.toList());
+
+        @SuppressWarnings("rawtypes")
+        List<Tuple3<String, Class, DataType>> columnsFromCatalog =
+                schemaFromCatalog.getColumns().stream()
+                        .map(c -> Tuple3.of(c.getName(), (Class) c.getClass(), 
c.getDataType()))
+                        .collect(Collectors.toList());
+
+        return Objects.equals(columnsFromPlan, columnsFromCatalog)
+                && Objects.equals(
+                        schemaFromPlan.getPrimaryKey(), 
schemaFromCatalog.getPrimaryKey());
+    }
+
+    private boolean isLookupForced(CatalogPlanRestore planRestoreOption) {
+        return planRestoreOption == CatalogPlanRestore.IDENTIFIER;
+    }
+
+    private boolean isLookupEnabled(CatalogPlanRestore planRestoreOption) {
+        return planRestoreOption != CatalogPlanRestore.ALL_ENFORCED;
+    }
+
+    static ValidationException missingIdentifier() {
+        return new ValidationException(
+                String.format(
+                        "The ContextResolvedTable cannot be deserialized, as 
no identifier is present within the json, "
+                                + "but lookup is forced by '%s' == '%s'. "
+                                + "Either allow restoring table from the 
catalog with '%s' == '%s' | '%s' or make sure you don't use anonymous tables 
when generating the plan.",
+                        TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS.key(),
+                        CatalogPlanRestore.IDENTIFIER.name(),
+                        TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS.key(),
+                        CatalogPlanRestore.ALL.name(),
+                        CatalogPlanRestore.ALL_ENFORCED.name()));
+    }
+
+    static ValidationException lookupDisabled(ObjectIdentifier 
objectIdentifier) {
+        return new ValidationException(
+                String.format(
+                        "The ContextResolvedTable with identifier %s does not 
contain any %s field, "
+                                + "but lookup is disabled because option '%s' 
== '%s'. "
+                                + "Either enable the catalog lookup with '%s' 
== '%s' | '%s' or regenerate the plan with '%s' != '%s'.",
+                        objectIdentifier,
+                        FIELD_NAME_CATALOG_TABLE,
+                        TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS.key(),
+                        CatalogPlanRestore.ALL_ENFORCED.name(),
+                        TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS.key(),
+                        CatalogPlanRestore.IDENTIFIER.name(),
+                        CatalogPlanRestore.ALL.name(),
+                        TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS.key(),
+                        
TableConfigOptions.CatalogPlanCompilation.IDENTIFIER.name()));
+    }
+
+    static ValidationException schemaNotMatching(
+            ObjectIdentifier objectIdentifier,
+            ResolvedSchema schemaFromPlan,
+            ResolvedSchema schemaFromCatalog) {
+        return new ValidationException(
+                String.format(
+                        "The schema of the table '%s' from the persisted plan 
does not match the schema loaded from the catalog: '%s' != '%s'. "
+                                + "Have you modified the table schema in the 
catalog before restoring the plan?.",
+                        objectIdentifier, schemaFromPlan, schemaFromCatalog));
+    }
+
+    static ValidationException missingTableFromCatalog(ObjectIdentifier 
objectIdentifier) {
+        return new ValidationException(
+                String.format(
+                        "CatalogManager cannot resolve the table with 
identifier %s and ContextResolvedTable does not contain any %s field. "

Review comment:
       Don't expose internal classes such as `CatalogManager` or 
`ContextResolvedTable` in exceptions. Also above.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import 
org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/** JSON serializer for {@link ContextResolvedTable}. */
+class ContextResolvedTableJsonSerializer extends 
StdSerializer<ContextResolvedTable> {
+    private static final long serialVersionUID = 1L;
+
+    public static final String FIELD_NAME_IDENTIFIER = "identifier";
+    public static final String FIELD_NAME_CATALOG_TABLE = "catalogTable";
+
+    public ContextResolvedTableJsonSerializer() {
+        super(ContextResolvedTable.class);
+    }
+
+    @Override
+    public void serialize(
+            ContextResolvedTable contextResolvedTable,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        final CatalogPlanCompilation planCompilationOption =
+                SerdeContext.get(serializerProvider)
+                        .getConfiguration()
+                        .get(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS);
+
+        if (contextResolvedTable.isAnonymous()
+                && planCompilationOption == CatalogPlanCompilation.IDENTIFIER) 
{
+            throw 
cannotSerializeAnonymousTable(contextResolvedTable.getIdentifier());
+        }
+
+        jsonGenerator.writeStartObject();
+
+        if (!contextResolvedTable.isAnonymous()) {
+            // Serialize object identifier
+            jsonGenerator.writeObjectField(
+                    FIELD_NAME_IDENTIFIER, 
contextResolvedTable.getIdentifier());
+        }
+
+        if ((contextResolvedTable.isPermanent() || 
contextResolvedTable.isAnonymous())
+                && planCompilationOption != CatalogPlanCompilation.IDENTIFIER) 
{
+            // Pass to the ResolvedCatalogTableJsonSerializer the option to 
serialize or not the
+            // identifier
+            serializerProvider.setAttribute(

Review comment:
       I really find it weird that a serializer sets attributes.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
##########
@@ -3,28 +3,117 @@
   "nodes" : [ {
     "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
     "scanTableSource" : {
-      "identifier" : {
-        "catalogName" : "default_catalog",
-        "databaseName" : "default_database",
-        "tableName" : "MyTable"
-      },
       "catalogTable" : {
-        "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND",
-        "schema.4.expr" : "PROCTIME()",
-        "schema.0.data-type" : "INT",
-        "schema.2.name" : "c",
-        "schema.1.name" : "b",
-        "schema.4.name" : "proctime",
-        "schema.1.data-type" : "BIGINT",
-        "schema.3.data-type" : "TIMESTAMP(3)",
-        "schema.2.data-type" : "VARCHAR(2147483647)",
-        "schema.3.name" : "rowtime",
-        "connector" : "values",
-        "schema.watermark.0.rowtime" : "rowtime",
-        "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)",
-        "schema.3.expr" : "TO_TIMESTAMP(`c`)",
-        "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL",
-        "schema.0.name" : "a"
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "catalogTable" : {
+          "resolvedSchema" : {
+            "columns" : [ {
+              "name" : "a",
+              "type" : "physical",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "type" : "physical",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "type" : "physical",
+              "dataType" : {
+                "logicalType" : "VARCHAR(2147483647)",
+                "conversionClass" : "java.lang.String"
+              }
+            }, {
+              "name" : "rowtime",
+              "type" : "computed",
+              "expression" : {
+                "type" : "rexNodeExpression",

Review comment:
       let's remove `"type" : "rexNodeExpression"` until we have something else 
than `RexNodeExpression`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.expressions.RexNodeExpression;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+class ResolvedExpressionJsonSerializer extends 
StdSerializer<ResolvedExpression> {
+
+    public static final String TYPE = "type";
+    public static final String TYPE_REX_NODE_EXPRESSION = "rexNodeExpression";
+    public static final String REX_NODE = "rexNode";
+    public static final String OUTPUT_DATA_TYPE = "outputDataType";
+    public static final String SERIALIZABLE_STRING = "serializableString";
+
+    protected ResolvedExpressionJsonSerializer() {
+        super(ResolvedExpression.class);
+    }
+
+    @Override
+    public void serialize(
+            ResolvedExpression resolvedExpression,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStartObject();
+
+        if (resolvedExpression instanceof RexNodeExpression) {
+            serialize((RexNodeExpression) resolvedExpression, jsonGenerator, 
serializerProvider);
+        } else {
+            throw new ValidationException(
+                    String.format(
+                            "Expression '%s' cannot be serialized. "
+                                    + "Currently, only SQL expressions can be 
serialized in the persisted plan.",
+                            resolvedExpression.asSummaryString()));
+        }
+
+        jsonGenerator.writeEndObject();
+    }
+
+    private void serialize(
+            RexNodeExpression expression,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStringField(TYPE, TYPE_REX_NODE_EXPRESSION);
+        serializerProvider.defaultSerializeField(REX_NODE, 
expression.getRexNode(), jsonGenerator);
+        serializerProvider.defaultSerializeField(

Review comment:
       Can't we even derive the type from the `RexNode`? I have the feeling we 
are duplicating a lot. Actually `serializableString` and `rexNode` are also 
kind of duplictated. Given that they are actually not required anymore as they 
are persisted in the operator following the plan.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
##########
@@ -36,32 +37,56 @@
 import javax.annotation.Nullable;
 
 import java.util.List;
+import java.util.Objects;
 
 /**
  * {@link DynamicTableSourceSpec} describes how to serialize/deserialize 
dynamic table sink table
  * and create {@link DynamicTableSink} from the deserialization result.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonInclude(JsonInclude.Include.NON_EMPTY)
-public class DynamicTableSinkSpec extends CatalogTableSpecBase {
+public class DynamicTableSinkSpec {
 
+    public static final String FIELD_NAME_CATALOG_TABLE_SPEC = "catalogTable";
     public static final String FIELD_NAME_SINK_ABILITY_SPECS = 
"sinkAbilitySpecs";
 
-    @JsonIgnore private DynamicTableSink tableSink;
-
-    @JsonProperty(FIELD_NAME_SINK_ABILITY_SPECS)
+    private final ContextResolvedTable contextResolvedTable;
     private final @Nullable List<SinkAbilitySpec> sinkAbilitySpecs;
 
+    @JsonIgnore private DynamicTableSink tableSink;
+    @JsonIgnore private ClassLoader classLoader;

Review comment:
       why do we need class loader and configuration here? can't we access them 
from other contexts. I seems wrong to me that every instance has references to 
those.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.catalog.Column;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.serializeOptionalField;
+
+class ColumnJsonSerializer extends StdSerializer<Column> {
+
+    public static final String COLUMN_TYPE = "type";
+    public static final String COLUMN_TYPE_PHYSICAL = "physical";
+    public static final String COLUMN_TYPE_COMPUTED = "computed";
+    public static final String COLUMN_TYPE_METADATA = "metadata";
+    public static final String NAME = "name";
+    public static final String DATA_TYPE = "dataType";
+    public static final String COMMENT = "comment";
+    public static final String EXPRESSION = "expression";
+    public static final String METADATA_KEY = "metadataKey";
+    public static final String IS_VIRTUAL = "isVirtual";
+
+    public ColumnJsonSerializer() {
+        super(Column.class);
+    }
+
+    @Override
+    public void serialize(
+            Column column, JsonGenerator jsonGenerator, SerializerProvider 
serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStartObject();
+
+        // Common fields
+        jsonGenerator.writeStringField(NAME, column.getName());
+        serializeOptionalField(jsonGenerator, COMMENT, column.getComment(), 
serializerProvider);
+
+        if (column instanceof Column.PhysicalColumn) {
+            serialize((Column.PhysicalColumn) column, jsonGenerator, 
serializerProvider);
+        } else if (column instanceof Column.MetadataColumn) {
+            serialize((Column.MetadataColumn) column, jsonGenerator, 
serializerProvider);
+        } else {
+            serialize((Column.ComputedColumn) column, jsonGenerator, 
serializerProvider);
+        }
+
+        jsonGenerator.writeEndObject();
+    }
+
+    private void serialize(

Review comment:
       `static`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.ObjectCodec;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE_COMPUTED;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE_METADATA;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE_PHYSICAL;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COMMENT;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.DATA_TYPE;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.EXPRESSION;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.IS_VIRTUAL;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.METADATA_KEY;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.NAME;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.deserializeOptionalField;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.traverse;
+
+class ColumnJsonDeserializer extends StdDeserializer<Column> {
+
+    private static final String[] SUPPORTED_COLUMN_TYPES =
+            new String[] {COLUMN_TYPE_PHYSICAL, COLUMN_TYPE_COMPUTED, 
COLUMN_TYPE_METADATA};
+
+    public ColumnJsonDeserializer() {
+        super(Column.class);
+    }
+
+    @Override
+    public Column deserialize(JsonParser jsonParser, DeserializationContext 
ctx)
+            throws IOException {
+        ObjectNode jsonNode = jsonParser.readValueAsTree();
+        String columnName = jsonNode.required(NAME).asText();
+        String columnType = jsonNode.required(COLUMN_TYPE).asText();
+
+        Column column;
+        switch (columnType) {
+            case COLUMN_TYPE_PHYSICAL:
+                column =
+                        deserializePhysicalColumn(columnName, jsonNode, 
jsonParser.getCodec(), ctx);
+                break;
+            case COLUMN_TYPE_COMPUTED:
+                column =
+                        deserializeComputedColumn(columnName, jsonNode, 
jsonParser.getCodec(), ctx);
+                break;
+            case COLUMN_TYPE_METADATA:
+                column =
+                        deserializeMetadataColumn(columnName, jsonNode, 
jsonParser.getCodec(), ctx);
+                break;
+            default:
+                throw new ValidationException(
+                        String.format(
+                                "Cannot recognize column type '%s'. Allowed 
types: %s.",
+                                columnType, 
Arrays.toString(SUPPORTED_COLUMN_TYPES)));
+        }
+        return column.withComment(
+                deserializeOptionalField(
+                                jsonNode, COMMENT, String.class, 
jsonParser.getCodec(), ctx)
+                        .orElse(null));
+    }
+
+    public Column.PhysicalColumn deserializePhysicalColumn(

Review comment:
       `private static`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java
##########
@@ -136,13 +137,74 @@ public LookupTableSource 
getLookupTableSource(FlinkContext flinkContext) {
         }
     }
 
-    public void setTableSource(DynamicTableSource tableSource) {
-        this.tableSource = tableSource;
+    @JsonGetter(FIELD_NAME_CATALOG_TABLE_SPEC)
+    public ContextResolvedTable getContextResolvedTable() {
+        return contextResolvedTable;
     }
 
-    @JsonIgnore
+    @JsonGetter(FIELD_NAME_SOURCE_ABILITY_SPECS)
     @Nullable
     public List<SourceAbilitySpec> getSourceAbilitySpecs() {
         return sourceAbilitySpecs;
     }
+
+    @JsonIgnore
+    public ClassLoader getClassLoader() {
+        return classLoader;
+    }
+
+    @JsonIgnore
+    public ReadableConfig getReadableConfig() {
+        return configuration;
+    }
+
+    public void setTableSource(DynamicTableSource tableSource) {
+        this.tableSource = tableSource;
+    }
+
+    public void setClassLoader(ClassLoader classLoader) {

Review comment:
       we should definitely try to make specs immutable.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java
##########
@@ -42,15 +38,28 @@ public ObjectIdentifierJsonDeserializer() {
 
     @Override
     public ObjectIdentifier deserialize(JsonParser jsonParser, 
DeserializationContext ctx)
-            throws IOException, JsonProcessingException {
-        final JsonNode identifierNode = jsonParser.readValueAsTree();
-        return deserialize(identifierNode);
+            throws IOException {
+        return deserialize(jsonParser.getValueAsString(), 
SerdeContext.get(ctx));
     }
 
-    public static ObjectIdentifier deserialize(JsonNode identifierNode) {
+    static ObjectIdentifier deserialize(String identifierStr, SerdeContext 
ctx) {

Review comment:
       This could have been a JIRA issue and PR on its own. It is better to 
fork PRs instead of having these 13K PRs that are hard to review.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ExternalCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * This serializer can be configured via an attribute to serialize or not the 
options and comments,
+ * setting the attribute {@link #SERIALIZE_OPTIONS} to {@code true} or {@code 
false}.
+ */
+class ResolvedCatalogTableJsonSerializer extends 
StdSerializer<ResolvedCatalogTable> {
+    private static final long serialVersionUID = 1L;
+
+    static final String SERIALIZE_OPTIONS = "serialize_options";
+
+    public static final String RESOLVED_SCHEMA = "resolvedSchema";
+    public static final String PARTITION_KEYS = "partitionKeys";
+    public static final String OPTIONS = "options";
+    public static final String COMMENT = "comment";
+
+    public ResolvedCatalogTableJsonSerializer() {
+        super(ResolvedCatalogTable.class);
+    }
+
+    @Override
+    public void serialize(
+            ResolvedCatalogTable resolvedCatalogTable,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        // Thia should never happen anyway, but we keep this assertion for 
sanity check
+        assert resolvedCatalogTable.getTableKind() == 
CatalogBaseTable.TableKind.TABLE;
+
+        boolean serializeOptions =
+                serializerProvider.getAttribute(SERIALIZE_OPTIONS) == null
+                        || (boolean) 
serializerProvider.getAttribute(SERIALIZE_OPTIONS);
+
+        jsonGenerator.writeStartObject();
+
+        if (resolvedCatalogTable.getOrigin() instanceof ExternalCatalogTable) {
+            throw new ValidationException(
+                    "Cannot serialize the table as it's an external inline 
table. "
+                            + "This might be caused by a usage of "
+                            + "StreamTableEnvironment#fromDataStream or 
TableResult#collect, "
+                            + "which are not supported by the persisted plan");

Review comment:
       nit: dot at the end of exceptions

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java
##########
@@ -81,19 +84,19 @@ private DynamicTableSource getTableSource(FlinkContext 
flinkContext) {
 
             tableSource =
                     FactoryUtil.createDynamicTableSource(
-                            // TODO Support creating from a catalog
                             factory,
-                            objectIdentifier,
-                            catalogTable,
+                            contextResolvedTable.getIdentifier(),
+                            contextResolvedTable.getResolvedTable(),
+                            SpecUtil.loadOptionsFromCatalogTable(

Review comment:
       we should not add too many utils. I think this method would be a good 
candidate for a upper class `DynamicTableSpec`?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.expressions.RexNodeExpression;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+class ResolvedExpressionJsonSerializer extends 
StdSerializer<ResolvedExpression> {
+
+    public static final String TYPE = "type";
+    public static final String TYPE_REX_NODE_EXPRESSION = "rexNodeExpression";
+    public static final String REX_NODE = "rexNode";
+    public static final String OUTPUT_DATA_TYPE = "outputDataType";
+    public static final String SERIALIZABLE_STRING = "serializableString";
+
+    protected ResolvedExpressionJsonSerializer() {
+        super(ResolvedExpression.class);
+    }
+
+    @Override
+    public void serialize(
+            ResolvedExpression resolvedExpression,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStartObject();
+
+        if (resolvedExpression instanceof RexNodeExpression) {
+            serialize((RexNodeExpression) resolvedExpression, jsonGenerator, 
serializerProvider);
+        } else {
+            throw new ValidationException(
+                    String.format(
+                            "Expression '%s' cannot be serialized. "
+                                    + "Currently, only SQL expressions can be 
serialized in the persisted plan.",
+                            resolvedExpression.asSummaryString()));
+        }
+
+        jsonGenerator.writeEndObject();
+    }
+
+    private void serialize(
+            RexNodeExpression expression,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStringField(TYPE, TYPE_REX_NODE_EXPRESSION);
+        serializerProvider.defaultSerializeField(REX_NODE, 
expression.getRexNode(), jsonGenerator);
+        serializerProvider.defaultSerializeField(

Review comment:
       Only serialize the logical type, this should be enough. It might was a 
mistake to let `Expression` return `DataType`. But we should not let this 
mistake bubble into the persisted plan.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.catalog.Constraint.ConstraintType;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+abstract class UniqueConstraintMixin {

Review comment:
       link to the class that this mixin references in the JavaDocs

##########
File path: 
flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
##########
@@ -1,76 +1,110 @@
 {
-   "flinkVersion":"",
-   "nodes":[
-      {
-         
"class":"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
-         "scanTableSource":{
-            "identifier":{
-               "catalogName":"default_catalog",
-               "databaseName":"default_database",
-               "tableName":"MyTable"
+  "flinkVersion": "",
+  "nodes": [
+    {
+      "class": 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+      "scanTableSource": {
+        "catalogTable": {
+          "identifier": "`default_catalog`.`default_database`.`MyTable`",
+          "catalogTable": {
+            "resolvedSchema": {
+              "columns": [
+                {
+                  "name": "a",
+                  "type": "physical",
+                  "dataType": "BIGINT"
+                },
+                {
+                  "name": "b",
+                  "type": "physical",

Review comment:
       type computed is implicit if there is an expression, metadata with a 
metadata key

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolvedTableJsonSerializer.FIELD_NAME_CATALOG_TABLE;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolvedTableJsonSerializer.FIELD_NAME_IDENTIFIER;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatalogTableJsonSerializer.OPTIONS;
+
+class ContextResolvedTableJsonDeserializer extends 
StdDeserializer<ContextResolvedTable> {
+    private static final long serialVersionUID = 1L;
+
+    public ContextResolvedTableJsonDeserializer() {
+        super(ContextResolvedTable.class);
+    }
+
+    @Override
+    public ContextResolvedTable deserialize(JsonParser jsonParser, 
DeserializationContext ctx)
+            throws IOException {
+        final CatalogPlanRestore planRestoreOption =
+                SerdeContext.get(ctx)
+                        .getConfiguration()
+                        .get(TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS);
+        final CatalogManager catalogManager =
+                SerdeContext.get(ctx).getFlinkContext().getCatalogManager();
+        final ObjectNode objectNode = jsonParser.readValueAsTree();
+
+        // Deserialize the two fields, if available
+        final ObjectIdentifier identifier =
+                JsonSerdeUtil.deserializeOptionalField(
+                                objectNode,
+                                FIELD_NAME_IDENTIFIER,
+                                ObjectIdentifier.class,
+                                jsonParser.getCodec(),
+                                ctx)
+                        .orElse(null);
+        ResolvedCatalogTable resolvedCatalogTable =
+                JsonSerdeUtil.deserializeOptionalField(
+                                objectNode,
+                                FIELD_NAME_CATALOG_TABLE,
+                                ResolvedCatalogTable.class,
+                                jsonParser.getCodec(),
+                                ctx)
+                        .orElse(null);
+
+        if (identifier == null && resolvedCatalogTable == null) {
+            throw new ValidationException(
+                    String.format(
+                            "The input json is invalid because it doesn't 
contain '%s', nor the '%s'.",
+                            FIELD_NAME_IDENTIFIER, FIELD_NAME_CATALOG_TABLE));
+        }
+
+        if (identifier == null) {
+            if (isLookupForced(planRestoreOption)) {
+                throw missingIdentifier();
+            }
+            return ContextResolvedTable.anonymous(resolvedCatalogTable);
+        }
+
+        Optional<ContextResolvedTable> contextResolvedTableFromCatalog =
+                isLookupEnabled(planRestoreOption)
+                        ? catalogManager.getTable(identifier)
+                        : Optional.empty();
+
+        // If we have a schema from the plan and from the catalog, we need to 
check they match.
+        if (contextResolvedTableFromCatalog.isPresent() && 
resolvedCatalogTable != null) {
+            ResolvedSchema schemaFromPlan = 
resolvedCatalogTable.getResolvedSchema();
+            ResolvedSchema schemaFromCatalog =
+                    contextResolvedTableFromCatalog.get().getResolvedSchema();
+            if (!areResolvedSchemasEqual(schemaFromPlan, schemaFromCatalog)) {
+                throw schemaNotMatching(identifier, schemaFromPlan, 
schemaFromCatalog);
+            }
+        }
+
+        if (resolvedCatalogTable == null || isLookupForced(planRestoreOption)) 
{
+            if (!isLookupEnabled(planRestoreOption)) {
+                throw lookupDisabled(identifier);
+            }
+            // We use what is stored inside the catalog
+            return contextResolvedTableFromCatalog.orElseThrow(
+                    () -> missingTableFromCatalog(identifier));
+        }
+
+        if (contextResolvedTableFromCatalog.isPresent()) {
+            // If no config map is present, then the ContextResolvedTable was 
serialized with
+            // SCHEMA, so we just need to return the catalog query result
+            if (objectNode.at("/" + FIELD_NAME_CATALOG_TABLE + "/" + 
OPTIONS).isMissingNode()) {
+                return contextResolvedTableFromCatalog.get();
+            }
+
+            return contextResolvedTableFromCatalog
+                    .flatMap(ContextResolvedTable::getCatalog)
+                    .map(c -> ContextResolvedTable.permanent(identifier, c, 
resolvedCatalogTable))
+                    .orElseGet(
+                            () -> ContextResolvedTable.temporary(identifier, 
resolvedCatalogTable));
+        }
+
+        return ContextResolvedTable.temporary(identifier, 
resolvedCatalogTable);
+    }
+
+    private boolean areResolvedSchemasEqual(
+            ResolvedSchema schemaFromPlan, ResolvedSchema schemaFromCatalog) {
+        // For schema equality we check:
+        //  * Columns size and order
+        //  * For each column: name, kind (class) and type
+        //  * Check partition keys set equality
+        @SuppressWarnings("rawtypes")
+        List<Tuple3<String, Class, DataType>> columnsFromPlan =

Review comment:
       can we avoid using `Tuple3`, this is a class from the DataStream API and 
actually there is a reason why Java has no tuples. usually, there is always a 
better alternative. A `for` loop with "early out" in this case.

##########
File path: 
flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
##########
@@ -1,76 +1,110 @@
 {
-   "flinkVersion":"",
-   "nodes":[
-      {
-         
"class":"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
-         "scanTableSource":{
-            "identifier":{
-               "catalogName":"default_catalog",
-               "databaseName":"default_database",
-               "tableName":"MyTable"
+  "flinkVersion": "",
+  "nodes": [
+    {
+      "class": 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+      "scanTableSource": {
+        "catalogTable": {
+          "identifier": "`default_catalog`.`default_database`.`MyTable`",
+          "catalogTable": {
+            "resolvedSchema": {
+              "columns": [
+                {
+                  "name": "a",
+                  "type": "physical",
+                  "dataType": "BIGINT"
+                },
+                {
+                  "name": "b",
+                  "type": "physical",
+                  "dataType": "INT"
+                },
+                {
+                  "name": "c",
+                  "type": "physical",
+                  "dataType": {
+                    "logicalType": "VARCHAR(2147483647)",

Review comment:
       let's only serialize the logical type

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ConverterDelegatingDeserializer.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.BeanDescription;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationConfig;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.DelegatingDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Deserializer which delegates to the default {@link BeanDeserializer} and 
then executes custom
+ * code to perform a conversion to another final value.
+ *
+ * <p>Use the {@link Converter} when you want to use Jackson annotations for 
defining serializers
+ * and deserializers, but after the deserialization you need to perform an 
additional transformation
+ * step that doesn't depend on the original JSON, e.g. enrich the output value 
with info from {@link
+ * SerdeContext}.
+ */
+class ConverterDelegatingDeserializer<T, R> extends DelegatingDeserializer {

Review comment:
       I'm pretty sure we don't need this class. Let's have an offline chat 
about it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to