This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 00d69e53fe7b263293608f0a949525441b82e09e
Author: fengli <ldliu...@163.com>
AuthorDate: Wed Apr 24 18:11:03 2024 +0800

    [FLINK-35189][table-common] CatalogPropertiesUtil support serialize and 
deserialize materialized table
---
 .../flink/table/catalog/CatalogPropertiesUtil.java | 140 ++++++++++++++++++++-
 1 file changed, 138 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
index 909e8f1c562..9bb6280cc7d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
@@ -28,9 +28,12 @@ import 
org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
 
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -117,6 +120,56 @@ public final class CatalogPropertiesUtil {
         }
     }
 
+    /**
+     * Serializes the given {@link ResolvedCatalogMaterializedTable} into a 
map of string
+     * properties.
+     */
+    public static Map<String, String> serializeCatalogMaterializedTable(
+            ResolvedCatalogMaterializedTable resolvedMaterializedTable) {
+        try {
+            final Map<String, String> properties = new HashMap<>();
+
+            serializeResolvedSchema(properties, 
resolvedMaterializedTable.getResolvedSchema());
+
+            final String comment = resolvedMaterializedTable.getComment();
+            if (comment != null && comment.length() > 0) {
+                properties.put(COMMENT, comment);
+            }
+
+            final Optional<Long> snapshot = 
resolvedMaterializedTable.getSnapshot();
+            snapshot.ifPresent(snapshotId -> properties.put(SNAPSHOT, 
Long.toString(snapshotId)));
+
+            serializePartitionKeys(properties, 
resolvedMaterializedTable.getPartitionKeys());
+
+            properties.putAll(resolvedMaterializedTable.getOptions());
+
+            properties.put(DEFINITION_QUERY, 
resolvedMaterializedTable.getDefinitionQuery());
+            properties.put(FRESHNESS, 
resolvedMaterializedTable.getFreshness().toString());
+
+            properties.put(
+                    LOGICAL_REFRESH_MODE, 
resolvedMaterializedTable.getLogicalRefreshMode().name());
+            properties.put(REFRESH_MODE, 
resolvedMaterializedTable.getRefreshMode().name());
+            properties.put(REFRESH_STATUS, 
resolvedMaterializedTable.getRefreshStatus().name());
+
+            resolvedMaterializedTable
+                    .getRefreshHandlerDescription()
+                    .ifPresent(
+                            refreshHandlerDesc ->
+                                    properties.put(REFRESH_HANDLER_DESC, 
refreshHandlerDesc));
+            if (resolvedMaterializedTable.getSerializedRefreshHandler() != 
null) {
+                properties.put(
+                        REFRESH_HANDLER_BYTES,
+                        new String(
+                                
resolvedMaterializedTable.getSerializedRefreshHandler(),
+                                StandardCharsets.UTF_8));
+            }
+
+            return properties;
+        } catch (Exception e) {
+            throw new CatalogException("Error in serializing catalog 
materialized table.", e);
+        }
+    }
+
     /** Deserializes the given map of string properties into an unresolved 
{@link CatalogTable}. */
     public static CatalogTable deserializeCatalogTable(Map<String, String> 
properties) {
         return deserializeCatalogTable(properties, null);
@@ -128,7 +181,7 @@ public final class CatalogPropertiesUtil {
      * @param properties The properties to deserialize from
      * @param fallbackKey The fallback key to get the schema properties. This 
is meant to support
      *     the old table (1.10) deserialization
-     * @return
+     * @return a catalog table instance.
      */
     public static CatalogTable deserializeCatalogTable(
             Map<String, String> properties, @Nullable String fallbackKey) {
@@ -158,6 +211,64 @@ public final class CatalogPropertiesUtil {
         }
     }
 
+    /**
+     * Deserializes the given map of string properties into an unresolved 
{@link
+     * CatalogMaterializedTable}.
+     */
+    public static CatalogMaterializedTable deserializeCatalogMaterializedTable(
+            Map<String, String> properties) {
+        try {
+            final Schema schema = deserializeSchema(properties, SCHEMA);
+
+            final @Nullable String comment = properties.get(COMMENT);
+
+            final @Nullable Long snapshot =
+                    properties.containsKey(SNAPSHOT)
+                            ? getValue(properties, SNAPSHOT, Long::parseLong)
+                            : null;
+
+            final List<String> partitionKeys = 
deserializePartitionKeys(properties);
+
+            final Map<String, String> options = deserializeOptions(properties, 
SCHEMA);
+
+            final String definitionQuery = properties.get(DEFINITION_QUERY);
+            final Duration freshness = 
Duration.parse(properties.get(FRESHNESS));
+
+            final CatalogMaterializedTable.LogicalRefreshMode 
logicalRefreshMode =
+                    CatalogMaterializedTable.LogicalRefreshMode.valueOf(
+                            properties.get(LOGICAL_REFRESH_MODE));
+            final CatalogMaterializedTable.RefreshMode refreshMode =
+                    
CatalogMaterializedTable.RefreshMode.valueOf(properties.get(REFRESH_MODE));
+            final CatalogMaterializedTable.RefreshStatus refreshStatus =
+                    
CatalogMaterializedTable.RefreshStatus.valueOf(properties.get(REFRESH_STATUS));
+
+            final @Nullable String refreshHandlerDesc = 
properties.get(REFRESH_HANDLER_DESC);
+            final @Nullable String refreshHandlerStringBytes =
+                    properties.get(REFRESH_HANDLER_BYTES);
+            final @Nullable byte[] refreshHandlerBytes =
+                    
StringUtils.isNullOrWhitespaceOnly(refreshHandlerStringBytes)
+                            ? null
+                            : 
refreshHandlerStringBytes.getBytes(StandardCharsets.UTF_8);
+
+            CatalogMaterializedTable.Builder builder = 
CatalogMaterializedTable.newBuilder();
+            builder.schema(schema)
+                    .comment(comment)
+                    .partitionKeys(partitionKeys)
+                    .options(options)
+                    .snapshot(snapshot)
+                    .definitionQuery(definitionQuery)
+                    .freshness(freshness)
+                    .logicalRefreshMode(logicalRefreshMode)
+                    .refreshMode(refreshMode)
+                    .refreshStatus(refreshStatus)
+                    .refreshHandlerDescription(refreshHandlerDesc)
+                    .serializedRefreshHandler(refreshHandlerBytes);
+            return builder.build();
+        } catch (Exception e) {
+            throw new CatalogException("Error in deserializing catalog 
materialized table.", e);
+        }
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Helper methods and constants
     // 
--------------------------------------------------------------------------------------------
@@ -205,6 +316,20 @@ public final class CatalogPropertiesUtil {
 
     private static final String SNAPSHOT = "snapshot";
 
+    private static final String DEFINITION_QUERY = "definition-query";
+
+    private static final String FRESHNESS = "freshness";
+
+    private static final String LOGICAL_REFRESH_MODE = "logical-refresh-mode";
+
+    private static final String REFRESH_MODE = "refresh-mode";
+
+    private static final String REFRESH_STATUS = "refresh-status";
+
+    private static final String REFRESH_HANDLER_DESC = "refresh-handler-desc";
+
+    private static final String REFRESH_HANDLER_BYTES = 
"refresh-handler-bytes";
+
     private static Map<String, String> deserializeOptions(
             Map<String, String> map, String schemaKey) {
         return map.entrySet().stream()
@@ -214,11 +339,22 @@ public final class CatalogPropertiesUtil {
                             return !key.startsWith(schemaKey + SEPARATOR)
                                     && !key.startsWith(PARTITION_KEYS + 
SEPARATOR)
                                     && !key.equals(COMMENT)
-                                    && !key.equals(SNAPSHOT);
+                                    && !key.equals(SNAPSHOT)
+                                    && !isMaterializedTableAttribute(key);
                         })
                 .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
     }
 
+    private static boolean isMaterializedTableAttribute(String key) {
+        return key.equals(DEFINITION_QUERY)
+                || key.equals(FRESHNESS)
+                || key.equals(LOGICAL_REFRESH_MODE)
+                || key.equals(REFRESH_MODE)
+                || key.equals(REFRESH_STATUS)
+                || key.equals(REFRESH_HANDLER_DESC)
+                || key.equals(REFRESH_HANDLER_BYTES);
+    }
+
     private static List<String> deserializePartitionKeys(Map<String, String> 
map) {
         final int partitionCount = getCount(map, PARTITION_KEYS, NAME);
         final List<String> partitionKeys = new ArrayList<>();

Reply via email to