This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 24078de [FLINK-13192][hive] Add tests for different Hive table formats 24078de is described below commit 24078de3dc9f9c4f016da3d1acd909d8311431dd Author: Rui Li <li...@apache.org> AuthorDate: Mon Jul 29 21:02:52 2019 +0800 [FLINK-13192][hive] Add tests for different Hive table formats This closes #9264 --- flink-connectors/flink-connector-hive/pom.xml | 4 --- .../connectors/hive/HiveTableOutputFormat.java | 19 ++++++---- .../flink/table/catalog/hive/HiveCatalog.java | 23 +++++++++--- .../connectors/hive/TableEnvHiveConnectorTest.java | 42 ++++++++++++++++++++++ 4 files changed, 73 insertions(+), 15 deletions(-) diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index 88ef8a1..9752be9 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -416,10 +416,6 @@ under the License. <artifactId>tez-common</artifactId> </exclusion> <exclusion> - <groupId>org.apache.tez</groupId> - <artifactId>tez-mapreduce</artifactId> - </exclusion> - <exclusion> <!-- This dependency is no longer shipped with the JDK since Java 9.--> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java index e4caac1..9e1ee46 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java @@ -58,9 +58,10 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -122,7 +123,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp // number of non-partitioning columns private transient int numNonPartitionColumns; - private transient AbstractSerDe serializer; + // SerDe in Hive-1.2.1 and Hive-2.3.4 can be of different classes, make sure to use a common base class + private transient Serializer recordSerDe; //StructObjectInspector represents the hive row structure. private transient StructObjectInspector rowObjectInspector; private transient Class<? extends Writable> outputClass; @@ -257,11 +259,14 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp public void open(int taskNumber, int numTasks) throws IOException { try { StorageDescriptor sd = hiveTablePartition.getStorageDescriptor(); - serializer = (AbstractSerDe) Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance(); - ReflectionUtils.setConf(serializer, jobConf); + Object serdeLib = Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance(); + Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer, + "Expect a SerDe lib implementing both Serializer and Deserializer, but actually got " + serdeLib.getClass().getName()); + recordSerDe = (Serializer) serdeLib; + ReflectionUtils.setConf(recordSerDe, jobConf); // TODO: support partition properties, for now assume they're same as table properties - SerDeUtils.initializeSerDe(serializer, jobConf, tableProperties, null); - outputClass = serializer.getSerializedClass(); + SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null); + outputClass = recordSerDe.getSerializedClass(); } catch (IllegalAccessException | SerDeException | InstantiationException | ClassNotFoundException e) { throw new FlinkRuntimeException("Error initializing Hive serializer", e); } @@ -331,7 +336,7 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp partitionToWriter.put(partName, partitionWriter); } } - partitionWriter.recordWriter.write(serializer.serialize(getConvertedRow(record), rowObjectInspector)); + partitionWriter.recordWriter.write(recordSerDe.serialize(getConvertedRow(record), rowObjectInspector)); } catch (IOException | SerDeException e) { throw new IOException("Could not write Record.", e); } catch (MetaException e) { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 261180f..dd50ce2 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -81,6 +81,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; import org.apache.hadoop.hive.ql.io.StorageFormatFactory; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hive.common.util.HiveStringUtils; import org.apache.thrift.TException; @@ -323,7 +324,7 @@ public class HiveCatalog extends AbstractCatalog { checkNotNull(tablePath, "tablePath cannot be null"); Table hiveTable = getHiveTable(tablePath); - return instantiateCatalogTable(hiveTable); + return instantiateCatalogTable(hiveTable, hiveConf); } @Override @@ -394,7 +395,7 @@ public class HiveCatalog extends AbstractCatalog { return; } - CatalogBaseTable existingTable = instantiateCatalogTable(hiveTable); + CatalogBaseTable existingTable = instantiateCatalogTable(hiveTable, hiveConf); if (existingTable.getClass() != newCatalogTable.getClass()) { throw new CatalogException( @@ -493,7 +494,7 @@ public class HiveCatalog extends AbstractCatalog { } } - private static CatalogBaseTable instantiateCatalogTable(Table hiveTable) { + private static CatalogBaseTable instantiateCatalogTable(Table hiveTable, HiveConf hiveConf) { boolean isView = TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW; // Table properties @@ -506,8 +507,22 @@ public class HiveCatalog extends AbstractCatalog { String comment = properties.remove(HiveCatalogConfig.COMMENT); // Table schema + List<FieldSchema> fields; + if (org.apache.hadoop.hive.ql.metadata.Table.hasMetastoreBasedSchema(hiveConf, + hiveTable.getSd().getSerdeInfo().getSerializationLib())) { + // get schema from metastore + fields = hiveTable.getSd().getCols(); + } else { + // get schema from deserializer + try { + fields = MetaStoreUtils.getFieldsFromDeserializer(hiveTable.getTableName(), + MetaStoreUtils.getDeserializer(hiveConf, hiveTable, true)); + } catch (SerDeException | MetaException e) { + throw new CatalogException("Failed to get Hive table schema from deserializer", e); + } + } TableSchema tableSchema = - HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys()); + HiveTableUtil.createTableSchema(fields, hiveTable.getPartitionKeys()); // Partition keys List<String> partitionKeys = new ArrayList<>(); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java index 4ac76cd..e8c402a 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java @@ -36,6 +36,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import java.util.Arrays; +import java.util.HashSet; +import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -100,10 +102,50 @@ public class TableEnvHiveConnectorTest { hiveShell.execute("drop database db1 cascade"); } + @Test + public void testDifferentFormats() throws Exception { + String[] formats = new String[]{"orc", "parquet", "sequencefile", "csv"}; + for (String format : formats) { + readWriteFormat(format); + } + } + + private void readWriteFormat(String format) throws Exception { + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + + hiveShell.execute("create database db1"); + + // create source and dest tables + String suffix; + if (format.equals("csv")) { + suffix = "row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'"; + } else { + suffix = "stored as " + format; + } + hiveShell.execute("create table db1.src (i int,s string) " + suffix); + hiveShell.execute("create table db1.dest (i int,s string) " + suffix); + + // prepare source data with Hive + hiveShell.execute("insert into db1.src values (1,'a'),(2,'b')"); + + // populate dest table with source table + tableEnv.sqlUpdate("insert into db1.dest select * from db1.src"); + tableEnv.execute("test_" + format); + + // verify data on hive side + verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta", "2\tb")); + + hiveShell.execute("drop database db1 cascade"); + } + private TableEnvironment getTableEnvWithHiveCatalog() { TableEnvironment tableEnv = HiveTestUtils.createTableEnv(); tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); tableEnv.useCatalog(hiveCatalog.getName()); return tableEnv; } + + private void verifyHiveQueryResult(String query, List<String> expected) { + assertEquals(new HashSet<>(expected), new HashSet<>(hiveShell.executeQuery(query))); + } }