Airblader commented on a change in pull request #15098:
URL: https://github.com/apache/flink/pull/15098#discussion_r591496802



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogView.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableSchema;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of a {@link CatalogView}. */
+@Internal
+class DefaultCatalogView implements CatalogView {
+
+    private final Schema schema;
+    private final @Nullable String comment;
+    private final String originalQuery;
+    private final String expandedQuery;
+    private final Map<String, String> options;
+
+    DefaultCatalogView(
+            Schema schema,
+            @Nullable String comment,
+            String originalQuery,
+            String expandedQuery,
+            Map<String, String> options) {
+        this.schema = checkNotNull(schema, "Schema must not be null.");
+        this.comment = comment;
+        this.originalQuery = checkNotNull(originalQuery, "Original query must 
not be null.");
+        this.expandedQuery = checkNotNull(expandedQuery, "Expanded query must 
not be null.");
+        this.options = checkNotNull(options, "Options must not be null.");
+
+        checkArgument(
+                options.entrySet().stream()
+                        .allMatch(e -> e.getKey() != null && e.getValue() != 
null),
+                "Options cannot have null keys or values.");
+    }
+
+    // TODO uncomment
+    // @Override
+    public Schema getUnresolvedSchema() {
+        return schema;
+    }
+
+    @Override
+    public TableSchema getSchema() {
+        // TODO move to upper class
+        return null;
+    }
+
+    @Override
+    public String getComment() {
+        return comment != null ? comment : "";
+    }
+
+    @Override
+    public String getOriginalQuery() {
+        return originalQuery;
+    }
+
+    @Override
+    public String getExpandedQuery() {
+        return expandedQuery;
+    }
+
+    @Override
+    public Map<String, String> getOptions() {
+        return options;
+    }
+
+    @Override
+    public CatalogBaseTable copy() {
+        return this;

Review comment:
       This should also make a proper copy

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
##########
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.Builder;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Column.ComputedColumn;
+import org.apache.flink.table.catalog.Column.MetadataColumn;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Utilities for de/serializing {@link Catalog} objects into a map of string 
properties. */
+@Internal
+public final class CatalogPropertiesUtil {
+
+    /**
+     * Flag to distinguish if a meta-object is a generic Flink object or not.
+     *
+     * <p>It is used to distinguish between Flink's generic connector 
discovery logic or specialized
+     * catalog connectors.
+     */
+    public static final String IS_GENERIC = "is_generic";
+
+    /**
+     * Globally reserved prefix for catalog properties. User-defined 
properties should not use this
+     * prefix. E.g. it is used to distinguish properties created by Hive and 
Flink, as Hive
+     * metastore has its own properties created upon table creation and 
migration between different
+     * versions of metastore.
+     */
+    public static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+    /** Serializes the given {@link ResolvedCatalogTable} into a map of string 
properties. */
+    public static Map<String, String> 
serializeCatalogTable(ResolvedCatalogTable resolvedTable) {
+        try {
+            final Map<String, String> properties = new HashMap<>();
+
+            serializeResolvedSchema(properties, 
resolvedTable.getResolvedSchema());
+
+            properties.put(COMMENT, resolvedTable.getComment());
+
+            serializePartitionKeys(properties, 
resolvedTable.getPartitionKeys());
+
+            properties.putAll(resolvedTable.getOptions());
+
+            properties.remove(IS_GENERIC); // reserved option
+
+            return properties;
+        } catch (Exception e) {
+            throw new CatalogException("Error in serializing catalog table.", 
e);
+        }
+    }
+
+    /** Deserializes the given map of string properties into an unresolved 
{@link CatalogTable}. */
+    public static CatalogTable deserializeCatalogTable(Map<String, String> 
properties) {
+        try {
+            final Schema schema = deserializeSchema(properties);
+
+            final @Nullable String comment = properties.get(COMMENT);
+
+            final List<String> partitionKeys = 
deserializePartitionKeys(properties);
+
+            final Map<String, String> options = deserializeOptions(properties);
+
+            return CatalogTable.of(schema, comment, partitionKeys, options);
+        } catch (Exception e) {
+            throw new CatalogException("Error in deserializing catalog 
table.", e);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Helper methods and constants
+    // 
--------------------------------------------------------------------------------------------
+
+    private static final String SCHEMA = "schema";
+
+    private static final String NAME = "name";
+
+    private static final String DATA_TYPE = "data-type";
+
+    private static final String EXPR = "expr";
+
+    private static final String METADATA = "metadata";
+
+    private static final String VIRTUAL = "virtual";
+
+    private static final String PARTITION_KEYS = "partition.keys";
+
+    private static final String WATERMARK = "watermark";
+
+    private static final String WATERMARK_ROWTIME = "rowtime";
+
+    private static final String WATERMARK_STRATEGY = "strategy";
+
+    private static final String WATERMARK_STRATEGY_EXPR = WATERMARK_STRATEGY + 
'.' + EXPR;
+
+    private static final String WATERMARK_STRATEGY_DATA_TYPE = 
WATERMARK_STRATEGY + '.' + DATA_TYPE;
+
+    private static final String PRIMARY_KEY_NAME = "primary-key.name";
+
+    private static final String PRIMARY_KEY_COLUMNS = "primary-key.columns";
+
+    private static final String COMMENT = "comment";
+
+    private static Map<String, String> deserializeOptions(Map<String, String> 
map) {
+        return map.entrySet().stream()
+                .filter(
+                        e -> {
+                            final String key = e.getKey();
+                            return !key.startsWith(SCHEMA + '.')
+                                    && !key.startsWith(PARTITION_KEYS + '.')
+                                    && !key.equals(COMMENT);
+                        })
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+
+    private static List<String> deserializePartitionKeys(Map<String, String> 
map) {
+        final int partitionCount = getCount(map, PARTITION_KEYS, NAME);
+        final List<String> partitionKeys = new ArrayList<>();
+        for (int i = 0; i < partitionCount; i++) {
+            final String partitionNameKey = PARTITION_KEYS + '.' + i + '.' + 
NAME;
+            final String partitionName = getValue(map, partitionNameKey);
+            partitionKeys.add(partitionName);
+        }
+        return partitionKeys;
+    }
+
+    private static Schema deserializeSchema(Map<String, String> map) {
+        final Builder builder = Schema.newBuilder();
+
+        deserializeColumns(map, builder);
+
+        deserializeWatermark(map, builder);
+
+        deserializePrimaryKey(map, builder);
+
+        return builder.build();
+    }
+
+    private static void deserializePrimaryKey(Map<String, String> map, Builder 
builder) {
+        final String constraintNameKey = SCHEMA + '.' + PRIMARY_KEY_NAME;

Review comment:
       I think it's worth extracting the separator into a constant since it's 
widely used here. Maybe even just a small utility function that takes a list of 
strings and concatenates them with the separator in between (i.e., essentially 
calling `String.join(SEPARATOR, …)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to