This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 00d69e53fe7b263293608f0a949525441b82e09e Author: fengli <ldliu...@163.com> AuthorDate: Wed Apr 24 18:11:03 2024 +0800 [FLINK-35189][table-common] CatalogPropertiesUtil support serialize and deserialize materialized table --- .../flink/table/catalog/CatalogPropertiesUtil.java | 140 ++++++++++++++++++++- 1 file changed, 138 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java index 909e8f1c562..9bb6280cc7d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java @@ -28,9 +28,12 @@ import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.StringUtils; import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -117,6 +120,56 @@ public final class CatalogPropertiesUtil { } } + /** + * Serializes the given {@link ResolvedCatalogMaterializedTable} into a map of string + * properties. + */ + public static Map<String, String> serializeCatalogMaterializedTable( + ResolvedCatalogMaterializedTable resolvedMaterializedTable) { + try { + final Map<String, String> properties = new HashMap<>(); + + serializeResolvedSchema(properties, resolvedMaterializedTable.getResolvedSchema()); + + final String comment = resolvedMaterializedTable.getComment(); + if (comment != null && comment.length() > 0) { + properties.put(COMMENT, comment); + } + + final Optional<Long> snapshot = resolvedMaterializedTable.getSnapshot(); + snapshot.ifPresent(snapshotId -> properties.put(SNAPSHOT, Long.toString(snapshotId))); + + serializePartitionKeys(properties, resolvedMaterializedTable.getPartitionKeys()); + + properties.putAll(resolvedMaterializedTable.getOptions()); + + properties.put(DEFINITION_QUERY, resolvedMaterializedTable.getDefinitionQuery()); + properties.put(FRESHNESS, resolvedMaterializedTable.getFreshness().toString()); + + properties.put( + LOGICAL_REFRESH_MODE, resolvedMaterializedTable.getLogicalRefreshMode().name()); + properties.put(REFRESH_MODE, resolvedMaterializedTable.getRefreshMode().name()); + properties.put(REFRESH_STATUS, resolvedMaterializedTable.getRefreshStatus().name()); + + resolvedMaterializedTable + .getRefreshHandlerDescription() + .ifPresent( + refreshHandlerDesc -> + properties.put(REFRESH_HANDLER_DESC, refreshHandlerDesc)); + if (resolvedMaterializedTable.getSerializedRefreshHandler() != null) { + properties.put( + REFRESH_HANDLER_BYTES, + new String( + resolvedMaterializedTable.getSerializedRefreshHandler(), + StandardCharsets.UTF_8)); + } + + return properties; + } catch (Exception e) { + throw new CatalogException("Error in serializing catalog materialized table.", e); + } + } + /** Deserializes the given map of string properties into an unresolved {@link CatalogTable}. */ public static CatalogTable deserializeCatalogTable(Map<String, String> properties) { return deserializeCatalogTable(properties, null); @@ -128,7 +181,7 @@ public final class CatalogPropertiesUtil { * @param properties The properties to deserialize from * @param fallbackKey The fallback key to get the schema properties. This is meant to support * the old table (1.10) deserialization - * @return + * @return a catalog table instance. */ public static CatalogTable deserializeCatalogTable( Map<String, String> properties, @Nullable String fallbackKey) { @@ -158,6 +211,64 @@ public final class CatalogPropertiesUtil { } } + /** + * Deserializes the given map of string properties into an unresolved {@link + * CatalogMaterializedTable}. + */ + public static CatalogMaterializedTable deserializeCatalogMaterializedTable( + Map<String, String> properties) { + try { + final Schema schema = deserializeSchema(properties, SCHEMA); + + final @Nullable String comment = properties.get(COMMENT); + + final @Nullable Long snapshot = + properties.containsKey(SNAPSHOT) + ? getValue(properties, SNAPSHOT, Long::parseLong) + : null; + + final List<String> partitionKeys = deserializePartitionKeys(properties); + + final Map<String, String> options = deserializeOptions(properties, SCHEMA); + + final String definitionQuery = properties.get(DEFINITION_QUERY); + final Duration freshness = Duration.parse(properties.get(FRESHNESS)); + + final CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode = + CatalogMaterializedTable.LogicalRefreshMode.valueOf( + properties.get(LOGICAL_REFRESH_MODE)); + final CatalogMaterializedTable.RefreshMode refreshMode = + CatalogMaterializedTable.RefreshMode.valueOf(properties.get(REFRESH_MODE)); + final CatalogMaterializedTable.RefreshStatus refreshStatus = + CatalogMaterializedTable.RefreshStatus.valueOf(properties.get(REFRESH_STATUS)); + + final @Nullable String refreshHandlerDesc = properties.get(REFRESH_HANDLER_DESC); + final @Nullable String refreshHandlerStringBytes = + properties.get(REFRESH_HANDLER_BYTES); + final @Nullable byte[] refreshHandlerBytes = + StringUtils.isNullOrWhitespaceOnly(refreshHandlerStringBytes) + ? null + : refreshHandlerStringBytes.getBytes(StandardCharsets.UTF_8); + + CatalogMaterializedTable.Builder builder = CatalogMaterializedTable.newBuilder(); + builder.schema(schema) + .comment(comment) + .partitionKeys(partitionKeys) + .options(options) + .snapshot(snapshot) + .definitionQuery(definitionQuery) + .freshness(freshness) + .logicalRefreshMode(logicalRefreshMode) + .refreshMode(refreshMode) + .refreshStatus(refreshStatus) + .refreshHandlerDescription(refreshHandlerDesc) + .serializedRefreshHandler(refreshHandlerBytes); + return builder.build(); + } catch (Exception e) { + throw new CatalogException("Error in deserializing catalog materialized table.", e); + } + } + // -------------------------------------------------------------------------------------------- // Helper methods and constants // -------------------------------------------------------------------------------------------- @@ -205,6 +316,20 @@ public final class CatalogPropertiesUtil { private static final String SNAPSHOT = "snapshot"; + private static final String DEFINITION_QUERY = "definition-query"; + + private static final String FRESHNESS = "freshness"; + + private static final String LOGICAL_REFRESH_MODE = "logical-refresh-mode"; + + private static final String REFRESH_MODE = "refresh-mode"; + + private static final String REFRESH_STATUS = "refresh-status"; + + private static final String REFRESH_HANDLER_DESC = "refresh-handler-desc"; + + private static final String REFRESH_HANDLER_BYTES = "refresh-handler-bytes"; + private static Map<String, String> deserializeOptions( Map<String, String> map, String schemaKey) { return map.entrySet().stream() @@ -214,11 +339,22 @@ public final class CatalogPropertiesUtil { return !key.startsWith(schemaKey + SEPARATOR) && !key.startsWith(PARTITION_KEYS + SEPARATOR) && !key.equals(COMMENT) - && !key.equals(SNAPSHOT); + && !key.equals(SNAPSHOT) + && !isMaterializedTableAttribute(key); }) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } + private static boolean isMaterializedTableAttribute(String key) { + return key.equals(DEFINITION_QUERY) + || key.equals(FRESHNESS) + || key.equals(LOGICAL_REFRESH_MODE) + || key.equals(REFRESH_MODE) + || key.equals(REFRESH_STATUS) + || key.equals(REFRESH_HANDLER_DESC) + || key.equals(REFRESH_HANDLER_BYTES); + } + private static List<String> deserializePartitionKeys(Map<String, String> map) { final int partitionCount = getCount(map, PARTITION_KEYS, NAME); final List<String> partitionKeys = new ArrayList<>();