This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 24078de  [FLINK-13192][hive] Add tests for different Hive table formats
24078de is described below

commit 24078de3dc9f9c4f016da3d1acd909d8311431dd
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Jul 29 21:02:52 2019 +0800

    [FLINK-13192][hive] Add tests for different Hive table formats
    
    This closes #9264
---
 flink-connectors/flink-connector-hive/pom.xml      |  4 ---
 .../connectors/hive/HiveTableOutputFormat.java     | 19 ++++++----
 .../flink/table/catalog/hive/HiveCatalog.java      | 23 +++++++++---
 .../connectors/hive/TableEnvHiveConnectorTest.java | 42 ++++++++++++++++++++++
 4 files changed, 73 insertions(+), 15 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml 
b/flink-connectors/flink-connector-hive/pom.xml
index 88ef8a1..9752be9 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -416,10 +416,6 @@ under the License.
                                        <artifactId>tez-common</artifactId>
                                </exclusion>
                                <exclusion>
-                                       <groupId>org.apache.tez</groupId>
-                                       <artifactId>tez-mapreduce</artifactId>
-                               </exclusion>
-                               <exclusion>
                                        <!-- This dependency is no longer 
shipped with the JDK since Java 9.-->
                                        <groupId>jdk.tools</groupId>
                                        <artifactId>jdk.tools</artifactId>
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index e4caac1..9e1ee46 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -58,9 +58,10 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -122,7 +123,8 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
        // number of non-partitioning columns
        private transient int numNonPartitionColumns;
 
-       private transient AbstractSerDe serializer;
+       // SerDe in Hive-1.2.1 and Hive-2.3.4 can be of different classes, make 
sure to use a common base class
+       private transient Serializer recordSerDe;
        //StructObjectInspector represents the hive row structure.
        private transient StructObjectInspector rowObjectInspector;
        private transient Class<? extends Writable> outputClass;
@@ -257,11 +259,14 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
        public void open(int taskNumber, int numTasks) throws IOException {
                try {
                        StorageDescriptor sd = 
hiveTablePartition.getStorageDescriptor();
-                       serializer = (AbstractSerDe) 
Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
-                       ReflectionUtils.setConf(serializer, jobConf);
+                       Object serdeLib = 
Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
+                       Preconditions.checkArgument(serdeLib instanceof 
Serializer && serdeLib instanceof Deserializer,
+                                       "Expect a SerDe lib implementing both 
Serializer and Deserializer, but actually got " + 
serdeLib.getClass().getName());
+                       recordSerDe = (Serializer) serdeLib;
+                       ReflectionUtils.setConf(recordSerDe, jobConf);
                        // TODO: support partition properties, for now assume 
they're same as table properties
-                       SerDeUtils.initializeSerDe(serializer, jobConf, 
tableProperties, null);
-                       outputClass = serializer.getSerializedClass();
+                       SerDeUtils.initializeSerDe((Deserializer) recordSerDe, 
jobConf, tableProperties, null);
+                       outputClass = recordSerDe.getSerializedClass();
                } catch (IllegalAccessException | SerDeException | 
InstantiationException | ClassNotFoundException e) {
                        throw new FlinkRuntimeException("Error initializing 
Hive serializer", e);
                }
@@ -331,7 +336,7 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                                        partitionToWriter.put(partName, 
partitionWriter);
                                }
                        }
-                       
partitionWriter.recordWriter.write(serializer.serialize(getConvertedRow(record),
 rowObjectInspector));
+                       
partitionWriter.recordWriter.write(recordSerDe.serialize(getConvertedRow(record),
 rowObjectInspector));
                } catch (IOException | SerDeException e) {
                        throw new IOException("Could not write Record.", e);
                } catch (MetaException e) {
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 261180f..dd50ce2 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
 import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.thrift.TException;
@@ -323,7 +324,7 @@ public class HiveCatalog extends AbstractCatalog {
                checkNotNull(tablePath, "tablePath cannot be null");
 
                Table hiveTable = getHiveTable(tablePath);
-               return instantiateCatalogTable(hiveTable);
+               return instantiateCatalogTable(hiveTable, hiveConf);
        }
 
        @Override
@@ -394,7 +395,7 @@ public class HiveCatalog extends AbstractCatalog {
                        return;
                }
 
-               CatalogBaseTable existingTable = 
instantiateCatalogTable(hiveTable);
+               CatalogBaseTable existingTable = 
instantiateCatalogTable(hiveTable, hiveConf);
 
                if (existingTable.getClass() != newCatalogTable.getClass()) {
                        throw new CatalogException(
@@ -493,7 +494,7 @@ public class HiveCatalog extends AbstractCatalog {
                }
        }
 
-       private static CatalogBaseTable instantiateCatalogTable(Table 
hiveTable) {
+       private static CatalogBaseTable instantiateCatalogTable(Table 
hiveTable, HiveConf hiveConf) {
                boolean isView = TableType.valueOf(hiveTable.getTableType()) == 
TableType.VIRTUAL_VIEW;
 
                // Table properties
@@ -506,8 +507,22 @@ public class HiveCatalog extends AbstractCatalog {
                String comment = properties.remove(HiveCatalogConfig.COMMENT);
 
                // Table schema
+               List<FieldSchema> fields;
+               if 
(org.apache.hadoop.hive.ql.metadata.Table.hasMetastoreBasedSchema(hiveConf,
+                               
hiveTable.getSd().getSerdeInfo().getSerializationLib())) {
+                       // get schema from metastore
+                       fields = hiveTable.getSd().getCols();
+               } else {
+                       // get schema from deserializer
+                       try {
+                               fields = 
MetaStoreUtils.getFieldsFromDeserializer(hiveTable.getTableName(),
+                                               
MetaStoreUtils.getDeserializer(hiveConf, hiveTable, true));
+                       } catch (SerDeException | MetaException e) {
+                               throw new CatalogException("Failed to get Hive 
table schema from deserializer", e);
+                       }
+               }
                TableSchema tableSchema =
-                       
HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), 
hiveTable.getPartitionKeys());
+                       HiveTableUtil.createTableSchema(fields, 
hiveTable.getPartitionKeys());
 
                // Partition keys
                List<String> partitionKeys = new ArrayList<>();
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 4ac76cd..e8c402a 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -36,6 +36,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -100,10 +102,50 @@ public class TableEnvHiveConnectorTest {
                hiveShell.execute("drop database db1 cascade");
        }
 
+       @Test
+       public void testDifferentFormats() throws Exception {
+               String[] formats = new String[]{"orc", "parquet", 
"sequencefile", "csv"};
+               for (String format : formats) {
+                       readWriteFormat(format);
+               }
+       }
+
+       private void readWriteFormat(String format) throws Exception {
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+
+               hiveShell.execute("create database db1");
+
+               // create source and dest tables
+               String suffix;
+               if (format.equals("csv")) {
+                       suffix = "row format serde 
'org.apache.hadoop.hive.serde2.OpenCSVSerde'";
+               } else {
+                       suffix = "stored as " + format;
+               }
+               hiveShell.execute("create table db1.src (i int,s string) " + 
suffix);
+               hiveShell.execute("create table db1.dest (i int,s string) " + 
suffix);
+
+               // prepare source data with Hive
+               hiveShell.execute("insert into db1.src values (1,'a'),(2,'b')");
+
+               // populate dest table with source table
+               tableEnv.sqlUpdate("insert into db1.dest select * from 
db1.src");
+               tableEnv.execute("test_" + format);
+
+               // verify data on hive side
+               verifyHiveQueryResult("select * from db1.dest", 
Arrays.asList("1\ta", "2\tb"));
+
+               hiveShell.execute("drop database db1 cascade");
+       }
+
        private TableEnvironment getTableEnvWithHiveCatalog() {
                TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
                tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
                tableEnv.useCatalog(hiveCatalog.getName());
                return tableEnv;
        }
+
+       private void verifyHiveQueryResult(String query, List<String> expected) 
{
+               assertEquals(new HashSet<>(expected), new 
HashSet<>(hiveShell.executeQuery(query)));
+       }
 }

Reply via email to