This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b68ca7  [FLINK-13005][hive] HiveCatalog should not add 
'flink.is_generic' key for Hive table
8b68ca7 is described below

commit 8b68ca769967f6c2035ceba6ebc25fe6b9988250
Author: bowen.li <bowenl...@gmail.com>
AuthorDate: Thu Jun 27 10:43:31 2019 -0700

    [FLINK-13005][hive] HiveCatalog should not add 'flink.is_generic' key for 
Hive table
    
    This PR fixes a bug that 'flink.is_generic' key should not add for metadata 
for a non-generic catalog table (a.k.a a Hive table).
    
    This closes #8904.
---
 .../apache/flink/table/catalog/hive/HiveCatalog.java | 20 +++++++++++---------
 .../flink/table/catalog/config/CatalogConfig.java    |  6 ++++++
 .../apache/flink/table/catalog/CatalogTestUtil.java  |  3 +++
 3 files changed, 20 insertions(+), 9 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 237b105..8659a80 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.batch.connectors.hive.HiveTableFactory;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.AbstractCatalog;
@@ -93,6 +94,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -107,11 +109,6 @@ public class HiveCatalog extends AbstractCatalog {
        private static final StorageFormatFactory storageFormatFactory = new 
StorageFormatFactory();
        private static final String DEFAULT_HIVE_TABLE_STORAGE_FORMAT = 
"TextFile";
 
-       // Prefix used to distinguish properties created by Hive and Flink,
-       // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
-       private static final String FLINK_PROPERTY_PREFIX = "flink.";
-       private static final String FLINK_PROPERTY_IS_GENERIC = 
FLINK_PROPERTY_PREFIX + CatalogConfig.IS_GENERIC;
-
        // Prefix used to distinguish Flink functions from Hive functions.
        // It's appended to Flink function's class name
        // because Hive's Function object doesn't have properties or other 
place to store the flag for Flink functions.
@@ -482,7 +479,7 @@ public class HiveCatalog extends AbstractCatalog {
                // Table properties
                Map<String, String> properties = hiveTable.getParameters();
 
-               boolean isGeneric = 
Boolean.valueOf(properties.computeIfAbsent(FLINK_PROPERTY_IS_GENERIC, k -> 
String.valueOf(false)));
+               boolean isGeneric = 
Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC));
                if (isGeneric) {
                        properties = retrieveFlinkProperties(properties);
                }
@@ -580,20 +577,25 @@ public class HiveCatalog extends AbstractCatalog {
 
        /**
         * Filter out Hive-created properties, and return Flink-created 
properties.
+        * Note that 'is_generic' is a special key and this method will leave 
it as-is.
         */
        private static Map<String, String> retrieveFlinkProperties(Map<String, 
String> hiveTableParams) {
                return hiveTableParams.entrySet().stream()
-                       .filter(e -> 
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+                       .filter(e -> 
e.getKey().startsWith(FLINK_PROPERTY_PREFIX) || 
e.getKey().equals(CatalogConfig.IS_GENERIC))
                        .collect(Collectors.toMap(e -> 
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
        }
 
        /**
         * Add a prefix to Flink-created properties to distinguish them from 
Hive-created properties.
+        * Note that 'is_generic' is a special key and this method will leave 
it as-is.
         */
        private static Map<String, String> maskFlinkProperties(Map<String, 
String> properties) {
                return properties.entrySet().stream()
                        .filter(e -> e.getKey() != null && e.getValue() != null)
-                       .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + 
e.getKey(), e -> e.getValue()));
+                       .map(e -> new Tuple2<>(
+                               e.getKey().equals(CatalogConfig.IS_GENERIC) ? 
e.getKey() : FLINK_PROPERTY_PREFIX + e.getKey(),
+                               e.getValue()))
+                       .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
        }
 
        // ------ partitions ------
@@ -769,7 +771,7 @@ public class HiveCatalog extends AbstractCatalog {
 
        // make sure both table and partition are generic, or neither is
        private static void ensureTableAndPartitionMatch(Table hiveTable, 
CatalogPartition catalogPartition) {
-               boolean isGeneric = 
Boolean.valueOf(hiveTable.getParameters().get(FLINK_PROPERTY_IS_GENERIC));
+               boolean isGeneric = 
Boolean.valueOf(hiveTable.getParameters().get(CatalogConfig.IS_GENERIC));
                if ((isGeneric && catalogPartition instanceof 
HiveCatalogPartition) ||
                        (!isGeneric && catalogPartition instanceof 
GenericCatalogPartition)) {
                        throw new CatalogException(String.format("Cannot handle 
%s partition for %s table",
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
index 05afa32..7a4a624 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
@@ -24,4 +24,10 @@ package org.apache.flink.table.catalog.config;
 public class CatalogConfig {
 
        public static final String IS_GENERIC = "is_generic";
+
+       // Globally reserved prefix for catalog properties.
+       // User defined properties should not with this prefix.
+       // Used to distinguish properties created by Hive and Flink,
+       // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+       public static final String FLINK_PROPERTY_PREFIX = "flink.";
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 905a44c..1c64025 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.plan.stats.TableStats;
 
 import java.util.Map;
 
+import static 
org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -57,6 +58,7 @@ public class CatalogTestUtil {
                if 
(Boolean.valueOf(t1.getProperties().get(CatalogConfig.IS_GENERIC))) {
                        assertEquals(t1.getProperties(), t2.getProperties());
                } else {
+                       
assertTrue(t2.getProperties().keySet().stream().noneMatch(k -> 
k.startsWith(FLINK_PROPERTY_PREFIX)));
                        
assertTrue(t2.getProperties().entrySet().containsAll(t1.getProperties().entrySet()));
                }
        }
@@ -73,6 +75,7 @@ public class CatalogTestUtil {
                if 
(Boolean.valueOf(v1.getProperties().get(CatalogConfig.IS_GENERIC))) {
                        assertEquals(v1.getProperties(), v2.getProperties());
                } else {
+                       
assertTrue(v2.getProperties().keySet().stream().noneMatch(k -> 
k.startsWith(FLINK_PROPERTY_PREFIX)));
                        
assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
                }
        }

Reply via email to