This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 8b68ca7 [FLINK-13005][hive] HiveCatalog should not add 'flink.is_generic' key for Hive table 8b68ca7 is described below commit 8b68ca769967f6c2035ceba6ebc25fe6b9988250 Author: bowen.li <bowenl...@gmail.com> AuthorDate: Thu Jun 27 10:43:31 2019 -0700 [FLINK-13005][hive] HiveCatalog should not add 'flink.is_generic' key for Hive table This PR fixes a bug that 'flink.is_generic' key should not add for metadata for a non-generic catalog table (a.k.a a Hive table). This closes #8904. --- .../apache/flink/table/catalog/hive/HiveCatalog.java | 20 +++++++++++--------- .../flink/table/catalog/config/CatalogConfig.java | 6 ++++++ .../apache/flink/table/catalog/CatalogTestUtil.java | 3 +++ 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 237b105..8659a80 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -19,6 +19,7 @@ package org.apache.flink.table.catalog.hive; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.batch.connectors.hive.HiveTableFactory; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.AbstractCatalog; @@ -93,6 +94,7 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -107,11 +109,6 @@ public class HiveCatalog extends AbstractCatalog { private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory(); private static final String DEFAULT_HIVE_TABLE_STORAGE_FORMAT = "TextFile"; - // Prefix used to distinguish properties created by Hive and Flink, - // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. - private static final String FLINK_PROPERTY_PREFIX = "flink."; - private static final String FLINK_PROPERTY_IS_GENERIC = FLINK_PROPERTY_PREFIX + CatalogConfig.IS_GENERIC; - // Prefix used to distinguish Flink functions from Hive functions. // It's appended to Flink function's class name // because Hive's Function object doesn't have properties or other place to store the flag for Flink functions. @@ -482,7 +479,7 @@ public class HiveCatalog extends AbstractCatalog { // Table properties Map<String, String> properties = hiveTable.getParameters(); - boolean isGeneric = Boolean.valueOf(properties.computeIfAbsent(FLINK_PROPERTY_IS_GENERIC, k -> String.valueOf(false))); + boolean isGeneric = Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC)); if (isGeneric) { properties = retrieveFlinkProperties(properties); } @@ -580,20 +577,25 @@ public class HiveCatalog extends AbstractCatalog { /** * Filter out Hive-created properties, and return Flink-created properties. + * Note that 'is_generic' is a special key and this method will leave it as-is. */ private static Map<String, String> retrieveFlinkProperties(Map<String, String> hiveTableParams) { return hiveTableParams.entrySet().stream() - .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX)) + .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX) || e.getKey().equals(CatalogConfig.IS_GENERIC)) .collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue())); } /** * Add a prefix to Flink-created properties to distinguish them from Hive-created properties. + * Note that 'is_generic' is a special key and this method will leave it as-is. */ private static Map<String, String> maskFlinkProperties(Map<String, String> properties) { return properties.entrySet().stream() .filter(e -> e.getKey() != null && e.getValue() != null) - .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + e.getKey(), e -> e.getValue())); + .map(e -> new Tuple2<>( + e.getKey().equals(CatalogConfig.IS_GENERIC) ? e.getKey() : FLINK_PROPERTY_PREFIX + e.getKey(), + e.getValue())) + .collect(Collectors.toMap(t -> t.f0, t -> t.f1)); } // ------ partitions ------ @@ -769,7 +771,7 @@ public class HiveCatalog extends AbstractCatalog { // make sure both table and partition are generic, or neither is private static void ensureTableAndPartitionMatch(Table hiveTable, CatalogPartition catalogPartition) { - boolean isGeneric = Boolean.valueOf(hiveTable.getParameters().get(FLINK_PROPERTY_IS_GENERIC)); + boolean isGeneric = Boolean.valueOf(hiveTable.getParameters().get(CatalogConfig.IS_GENERIC)); if ((isGeneric && catalogPartition instanceof HiveCatalogPartition) || (!isGeneric && catalogPartition instanceof GenericCatalogPartition)) { throw new CatalogException(String.format("Cannot handle %s partition for %s table", diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java index 05afa32..7a4a624 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java @@ -24,4 +24,10 @@ package org.apache.flink.table.catalog.config; public class CatalogConfig { public static final String IS_GENERIC = "is_generic"; + + // Globally reserved prefix for catalog properties. + // User defined properties should not with this prefix. + // Used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + public static final String FLINK_PROPERTY_PREFIX = "flink."; } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java index 905a44c..1c64025 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java @@ -33,6 +33,7 @@ import org.apache.flink.table.plan.stats.TableStats; import java.util.Map; +import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -57,6 +58,7 @@ public class CatalogTestUtil { if (Boolean.valueOf(t1.getProperties().get(CatalogConfig.IS_GENERIC))) { assertEquals(t1.getProperties(), t2.getProperties()); } else { + assertTrue(t2.getProperties().keySet().stream().noneMatch(k -> k.startsWith(FLINK_PROPERTY_PREFIX))); assertTrue(t2.getProperties().entrySet().containsAll(t1.getProperties().entrySet())); } } @@ -73,6 +75,7 @@ public class CatalogTestUtil { if (Boolean.valueOf(v1.getProperties().get(CatalogConfig.IS_GENERIC))) { assertEquals(v1.getProperties(), v2.getProperties()); } else { + assertTrue(v2.getProperties().keySet().stream().noneMatch(k -> k.startsWith(FLINK_PROPERTY_PREFIX))); assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet())); } }