This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new b6373cb  [FLINK-15234][table][hive] hive table created from flink 
catalog table cannot have null properties in parameters
b6373cb is described below

commit b6373cba04a9f45d6a851684f80785af1f94b489
Author: bowen.li <bowenl...@gmail.com>
AuthorDate: Fri Dec 13 11:52:03 2019 -0800

    [FLINK-15234][table][hive] hive table created from flink catalog table 
cannot have null properties in parameters
---
 .../flink/table/catalog/hive/HiveCatalog.java      |  4 +-
 .../flink/table/catalog/hive/HiveCatalogTest.java  | 80 ++++++++++++++++++++++
 .../flink/table/catalog/AbstractCatalogTable.java  |  6 ++
 3 files changed, 89 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index d4a069e..9770230 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -567,7 +567,9 @@ public class HiveCatalog extends AbstractCatalog {
 
                Map<String, String> properties = new 
HashMap<>(table.getProperties());
                // Table comment
-               properties.put(HiveCatalogConfig.COMMENT, table.getComment());
+               if (table.getComment() != null) {
+                       properties.put(HiveCatalogConfig.COMMENT, 
table.getComment());
+               }
 
                boolean isGeneric = 
Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC));
 
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
new file mode 100644
index 0000000..23ccb45
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.descriptors.FileSystem;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for HiveCatalog.
+ */
+public class HiveCatalogTest {
+
+       TableSchema schema = TableSchema.builder()
+               .field("name", DataTypes.STRING())
+               .field("age", DataTypes.INT())
+               .build();
+
+       @Test
+       public void testCreateGenericTable() {
+               Table hiveTable = HiveCatalog.instantiateHiveTable(
+                       new ObjectPath("test", "test"),
+                       new CatalogTableImpl(
+                               schema,
+                               new 
FileSystem().path("/test_path").toProperties(),
+                               null
+                       ));
+
+               Map<String, String> prop = hiveTable.getParameters();
+               assertEquals(prop.remove(CatalogConfig.IS_GENERIC), 
String.valueOf("true"));
+               assertTrue(prop.keySet().stream().allMatch(k -> 
k.startsWith(CatalogConfig.FLINK_PROPERTY_PREFIX)));
+       }
+
+       @Test
+       public void testCreateHiveTable() {
+               Map<String, String> map = new HashMap<>(new 
FileSystem().path("/test_path").toProperties());
+
+               map.put(CatalogConfig.IS_GENERIC, String.valueOf(false));
+
+               Table hiveTable = HiveCatalog.instantiateHiveTable(
+                       new ObjectPath("test", "test"),
+                       new CatalogTableImpl(
+                               schema,
+                               map,
+                               null
+                       ));
+
+               Map<String, String> prop = hiveTable.getParameters();
+               assertEquals(prop.remove(CatalogConfig.IS_GENERIC), 
String.valueOf(false));
+               assertTrue(prop.keySet().stream().noneMatch(k -> 
k.startsWith(CatalogConfig.FLINK_PROPERTY_PREFIX)));
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
index d586b5d..4f0f440 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -54,6 +55,11 @@ public abstract class AbstractCatalogTable implements 
CatalogTable {
                this.tableSchema = checkNotNull(tableSchema, "tableSchema 
cannot be null");
                this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys 
cannot be null");
                this.properties = checkNotNull(properties, "properties cannot 
be null");
+
+               checkArgument(
+                       properties.entrySet().stream().allMatch(e -> e.getKey() 
!= null && e.getValue() != null),
+                       "properties cannot have null keys or values");
+
                this.comment = comment;
        }
 

Reply via email to