This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d81374  [FLINK-13012][hive] Handle default partition name of Hive 
table
1d81374 is described below

commit 1d81374ce344e7bde410792c8a2efd6f4e707b9f
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Jul 11 21:08:22 2019 +0800

    [FLINK-13012][hive] Handle default partition name of Hive table
    
    This closes #9088
---
 .../connectors/hive/HiveTableOutputFormat.java     | 15 +++-
 .../batch/connectors/hive/HiveTableSource.java     | 11 ++-
 .../connectors/hive/TableEnvHiveConnectorTest.java | 88 ++++++++++++++++++++++
 3 files changed, 109 insertions(+), 5 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index acb1bf9..ade5830 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -141,6 +141,9 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
        // to convert Flink object to Hive object
        private transient HiveObjectConversion[] hiveConversions;
 
+       // used when partition values is null or empty
+       private transient String defaultPartitionName;
+
        public HiveTableOutputFormat(JobConf jobConf, ObjectPath tablePath, 
CatalogTable table, HiveTablePartition hiveTablePartition,
                                                                Properties 
tableProperties, boolean overwrite) {
                super(jobConf.getCredentials());
@@ -298,6 +301,8 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                        rowObjectInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(
                                
Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - 
partitionColumns.size()),
                                objectInspectors);
+                       defaultPartitionName = 
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+                                       
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
                }
        }
 
@@ -310,10 +315,12 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                                // only need to check the dynamic partitions
                                final int numStaticPart = 
hiveTablePartition.getPartitionSpec().size();
                                for (int i = dynamicPartitionOffset; i < 
record.getArity(); i++) {
-                                       // TODO: seems Hive also just calls 
toString(), need further investigation to confirm
-                                       // TODO: validate partition value
-                                       String partVal = 
record.getField(i).toString();
-                                       dynPartSpec.put(partitionColumns.get(i 
- dynamicPartitionOffset + numStaticPart), partVal);
+                                       Object field = record.getField(i);
+                                       String partitionValue = field != null ? 
field.toString() : null;
+                                       if (partitionValue == null || 
partitionValue.isEmpty()) {
+                                               partitionValue = 
defaultPartitionName;
+                                       }
+                                       dynPartSpec.put(partitionColumns.get(i 
- dynamicPartitionOffset + numStaticPart), partitionValue);
                                }
                                String partName = 
Warehouse.makePartPath(dynPartSpec);
                                partitionWriter = 
partitionToWriter.get(partName);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
index 706442d..e0734f4 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
@@ -157,6 +157,8 @@ public class HiveTableSource extends 
InputFormatTableSource<Row> implements Part
                        String tableName = tablePath.getObjectName();
                        List<String> partitionColNames = 
catalogTable.getPartitionKeys();
                        if (partitionColNames != null && 
partitionColNames.size() > 0) {
+                               final String defaultPartitionName = 
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+                                               
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
                                List<Partition> partitions =
                                                client.listPartitions(dbName, 
tableName, (short) -1);
                                for (Partition partition : partitions) {
@@ -168,7 +170,14 @@ public class HiveTableSource extends 
InputFormatTableSource<Row> implements Part
                                                String partitionValue = 
partition.getValues().get(i);
                                                
partitionSpec.put(partitionColName, partitionValue);
                                                DataType type = 
catalogTable.getSchema().getFieldDataType(partitionColName).get();
-                                               Object partitionObject = 
restorePartitionValueFromFromType(partitionValue, type);
+                                               Object partitionObject;
+                                               if 
(defaultPartitionName.equals(partitionValue)) {
+                                                       LogicalTypeRoot 
typeRoot = type.getLogicalType().getTypeRoot();
+                                                       // while this is inline 
with Hive, seems it should be null for string columns as well
+                                                       partitionObject = 
typeRoot == LogicalTypeRoot.CHAR || typeRoot == LogicalTypeRoot.VARCHAR ? 
defaultPartitionName : null;
+                                               } else {
+                                                       partitionObject = 
restorePartitionValueFromFromType(partitionValue, type);
+                                               }
                                                
partitionColValues.put(partitionColName, partitionObject);
                                        }
                                        HiveTablePartition hiveTablePartition = 
new HiveTablePartition(sd, partitionColValues);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/TableEnvHiveConnectorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/TableEnvHiveConnectorTest.java
new file mode 100644
index 0000000..2f910a9
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/TableEnvHiveConnectorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test hive connector with table API.
+ */
+@RunWith(FlinkStandaloneHiveRunner.class)
+public class TableEnvHiveConnectorTest {
+
+       @HiveSQL(files = {})
+       private static HiveShell hiveShell;
+
+       private static HiveCatalog hiveCatalog;
+       private static HiveMetastoreClientWrapper hmsClient;
+
+       @BeforeClass
+       public static void setup() {
+               HiveConf hiveConf = hiveShell.getHiveConf();
+               hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
+               hiveCatalog.open();
+               hmsClient = HiveMetastoreClientFactory.create(hiveConf, null);
+       }
+
+       @Test
+       public void testDefaultPartitionName() throws Exception {
+               hiveShell.execute("create database db1");
+               hiveShell.execute("create table db1.src (x int, y int)");
+               hiveShell.execute("create table db1.part (x int) partitioned by 
(y int)");
+               hiveShell.insertInto("db1", "src").addRow(1, 1).addRow(2, 
null).commit();
+
+               TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
+               tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+               tableEnv.useCatalog(hiveCatalog.getName());
+
+               // test generating partitions with default name
+               tableEnv.sqlUpdate("insert into db1.part select * from 
db1.src");
+               tableEnv.execute("mytest");
+               HiveConf hiveConf = hiveShell.getHiveConf();
+               String defaultPartName = 
hiveConf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
+               Table hiveTable = hmsClient.getTable("db1", "part");
+               Path defaultPartPath = new 
Path(hiveTable.getSd().getLocation(), "y=" + defaultPartName);
+               FileSystem fs = defaultPartPath.getFileSystem(hiveConf);
+               assertTrue(fs.exists(defaultPartPath));
+
+               // TODO: test reading from flink when 
https://issues.apache.org/jira/browse/FLINK-13279 is fixed
+               assertEquals(Arrays.asList("1\t1", "2\tNULL"), 
hiveShell.executeQuery("select * from db1.part"));
+
+               hiveShell.execute("drop database db1 cascade");
+       }
+}

Reply via email to