This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 25e6143 [FLINK-13012][hive] Handle default partition name of Hive table 25e6143 is described below commit 25e6143a3c425c47851cbd69a41d68fe53f62f91 Author: Rui Li <li...@apache.org> AuthorDate: Thu Jul 11 21:08:22 2019 +0800 [FLINK-13012][hive] Handle default partition name of Hive table This closes #9088 --- .../connectors/hive/HiveTableOutputFormat.java | 15 +++- .../batch/connectors/hive/HiveTableSource.java | 11 ++- .../connectors/hive/TableEnvHiveConnectorTest.java | 88 ++++++++++++++++++++++ 3 files changed, 109 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java index acb1bf9..ade5830 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java @@ -141,6 +141,9 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp // to convert Flink object to Hive object private transient HiveObjectConversion[] hiveConversions; + // used when partition values is null or empty + private transient String defaultPartitionName; + public HiveTableOutputFormat(JobConf jobConf, ObjectPath tablePath, CatalogTable table, HiveTablePartition hiveTablePartition, Properties tableProperties, boolean overwrite) { super(jobConf.getCredentials()); @@ -298,6 +301,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector( Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - partitionColumns.size()), objectInspectors); + defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, + HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal); } } @@ -310,10 +315,12 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp // only need to check the dynamic partitions final int numStaticPart = hiveTablePartition.getPartitionSpec().size(); for (int i = dynamicPartitionOffset; i < record.getArity(); i++) { - // TODO: seems Hive also just calls toString(), need further investigation to confirm - // TODO: validate partition value - String partVal = record.getField(i).toString(); - dynPartSpec.put(partitionColumns.get(i - dynamicPartitionOffset + numStaticPart), partVal); + Object field = record.getField(i); + String partitionValue = field != null ? field.toString() : null; + if (partitionValue == null || partitionValue.isEmpty()) { + partitionValue = defaultPartitionName; + } + dynPartSpec.put(partitionColumns.get(i - dynamicPartitionOffset + numStaticPart), partitionValue); } String partName = Warehouse.makePartPath(dynPartSpec); partitionWriter = partitionToWriter.get(partName); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java index 706442d..e0734f4 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java @@ -157,6 +157,8 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part String tableName = tablePath.getObjectName(); List<String> partitionColNames = catalogTable.getPartitionKeys(); if (partitionColNames != null && partitionColNames.size() > 0) { + final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, + HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal); List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1); for (Partition partition : partitions) { @@ -168,7 +170,14 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part String partitionValue = partition.getValues().get(i); partitionSpec.put(partitionColName, partitionValue); DataType type = catalogTable.getSchema().getFieldDataType(partitionColName).get(); - Object partitionObject = restorePartitionValueFromFromType(partitionValue, type); + Object partitionObject; + if (defaultPartitionName.equals(partitionValue)) { + LogicalTypeRoot typeRoot = type.getLogicalType().getTypeRoot(); + // while this is inline with Hive, seems it should be null for string columns as well + partitionObject = typeRoot == LogicalTypeRoot.CHAR || typeRoot == LogicalTypeRoot.VARCHAR ? defaultPartitionName : null; + } else { + partitionObject = restorePartitionValueFromFromType(partitionValue, type); + } partitionColValues.put(partitionColName, partitionObject); } HiveTablePartition hiveTablePartition = new HiveTablePartition(sd, partitionColValues); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/TableEnvHiveConnectorTest.java new file mode 100644 index 0000000..2f910a9 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/TableEnvHiveConnectorTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.hive; + +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.HiveTestUtils; +import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory; +import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; + +import com.klarna.hiverunner.HiveShell; +import com.klarna.hiverunner.annotations.HiveSQL; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test hive connector with table API. + */ +@RunWith(FlinkStandaloneHiveRunner.class) +public class TableEnvHiveConnectorTest { + + @HiveSQL(files = {}) + private static HiveShell hiveShell; + + private static HiveCatalog hiveCatalog; + private static HiveMetastoreClientWrapper hmsClient; + + @BeforeClass + public static void setup() { + HiveConf hiveConf = hiveShell.getHiveConf(); + hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf); + hiveCatalog.open(); + hmsClient = HiveMetastoreClientFactory.create(hiveConf, null); + } + + @Test + public void testDefaultPartitionName() throws Exception { + hiveShell.execute("create database db1"); + hiveShell.execute("create table db1.src (x int, y int)"); + hiveShell.execute("create table db1.part (x int) partitioned by (y int)"); + hiveShell.insertInto("db1", "src").addRow(1, 1).addRow(2, null).commit(); + + TableEnvironment tableEnv = HiveTestUtils.createTableEnv(); + tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tableEnv.useCatalog(hiveCatalog.getName()); + + // test generating partitions with default name + tableEnv.sqlUpdate("insert into db1.part select * from db1.src"); + tableEnv.execute("mytest"); + HiveConf hiveConf = hiveShell.getHiveConf(); + String defaultPartName = hiveConf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME); + Table hiveTable = hmsClient.getTable("db1", "part"); + Path defaultPartPath = new Path(hiveTable.getSd().getLocation(), "y=" + defaultPartName); + FileSystem fs = defaultPartPath.getFileSystem(hiveConf); + assertTrue(fs.exists(defaultPartPath)); + + // TODO: test reading from flink when https://issues.apache.org/jira/browse/FLINK-13279 is fixed + assertEquals(Arrays.asList("1\t1", "2\tNULL"), hiveShell.executeQuery("select * from db1.part")); + + hiveShell.execute("drop database db1 cascade"); + } +}