HAWQ-1215. Support Complextypes with HiveORC
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/3b15739a Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/3b15739a Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/3b15739a Branch: refs/heads/2.1.0.0-incubating Commit: 3b15739a009601049f7343131abe889b204b4d62 Parents: aa5792d Author: Kavinder Dhaliwal <kavind...@gmail.com> Authored: Fri Dec 23 16:27:58 2016 -0800 Committer: Kavinder Dhaliwal <kavind...@gmail.com> Committed: Wed Jan 11 14:14:36 2017 -0800 ---------------------------------------------------------------------- .../plugins/hive/HiveInputFormatFragmenter.java | 5 +-- .../pxf/plugins/hive/HiveORCSerdeResolver.java | 32 ++++++++++++++++++++ .../plugins/hive/utilities/HiveUtilities.java | 3 +- 3 files changed, 37 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3b15739a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java index 051a246..ca4501b 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java @@ -26,7 +26,6 @@ import org.apache.hawq.pxf.api.UserDataException; import org.apache.hawq.pxf.api.io.DataType; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.InputData; -import org.apache.hawq.pxf.plugins.hive.utilities.EnumHiveToHawqType; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import java.util.Arrays; import java.util.List; +import java.util.Properties; /** * Specialized Hive fragmenter for RC and Text files tables. Unlike the @@ -55,10 +55,11 @@ import java.util.List; */ public class HiveInputFormatFragmenter extends HiveDataFragmenter { private static final Log LOG = LogFactory.getLog(HiveInputFormatFragmenter.class); - private static final int EXPECTED_NUM_OF_TOKS = 3; + private static final int EXPECTED_NUM_OF_TOKS = 4; public static final int TOK_SERDE = 0; public static final int TOK_KEYS = 1; public static final int TOK_FILTER_DONE = 2; + public static final int TOK_COL_TYPES = 3; /** Defines the Hive input formats currently supported in pxf */ public enum PXF_HIVE_INPUT_FORMATS { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3b15739a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java index 7673713..93aa474 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java @@ -45,6 +45,7 @@ public class HiveORCSerdeResolver extends HiveResolver { private static final Log LOG = LogFactory.getLog(HiveORCSerdeResolver.class); private OrcSerde deserializer; private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType; + private String typesString; public HiveORCSerdeResolver(InputData input) throws Exception { super(input); @@ -61,6 +62,7 @@ public class HiveORCSerdeResolver extends HiveResolver { throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr); } partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS]; + typesString = toks[HiveInputFormatFragmenter.TOK_COL_TYPES]; collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM : input.getUserProperty("COLLECTION_DELIM"); mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM @@ -102,11 +104,19 @@ public class HiveORCSerdeResolver extends HiveResolver { StringBuilder columnNames = new StringBuilder(numberOfDataColumns * 2); // column + delimiter StringBuilder columnTypes = new StringBuilder(numberOfDataColumns * 2); // column + delimiter + String[] cols = typesString.split(":"); + String[] hiveColTypes = new String[numberOfDataColumns]; + parseColTypes(cols, hiveColTypes); + String delim = ","; for (int i = 0; i < numberOfDataColumns; i++) { ColumnDescriptor column = input.getColumn(i); String columnName = column.columnName(); String columnType = HiveUtilities.toCompatibleHiveType(DataType.get(column.columnTypeCode()), column.columnTypeModifiers()); + //Complex Types will have a mismatch between Hive and Hawq type + if (!columnType.equals(hiveColTypes[i])) { + columnType = hiveColTypes[i]; + } if(i > 0) { columnNames.append(delim); columnTypes.append(delim); @@ -125,4 +135,26 @@ public class HiveORCSerdeResolver extends HiveResolver { deserializer.initialize(new JobConf(new Configuration(), HiveORCSerdeResolver.class), serdeProperties); } + + private void parseColTypes(String[] cols, String[] output) { + int i = 0; + StringBuilder structTypeBuilder = new StringBuilder(); + boolean inStruct = false; + for (String str : cols) { + if (str.contains("struct")) { + structTypeBuilder = new StringBuilder(); + inStruct = true; + structTypeBuilder.append(str); + } else if (inStruct) { + structTypeBuilder.append(':'); + structTypeBuilder.append(str); + if (str.contains(">")) { + inStruct = false; + output[i++] = structTypeBuilder.toString(); + } + } else { + output[i++] = str; + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3b15739a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java index ffd66b8..f7ebf4d 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java @@ -444,9 +444,10 @@ public class HiveUtilities { String inputFormatName = partData.storageDesc.getInputFormat(); String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib(); String partitionKeys = serializePartitionKeys(partData); + String colTypes = partData.properties.getProperty("columns.types"); assertFileType(inputFormatName, partData); userData = assertSerde(serdeName, partData) + HiveDataFragmenter.HIVE_UD_DELIM - + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter; + + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter + HiveDataFragmenter.HIVE_UD_DELIM + colTypes; } else if (HiveDataFragmenter.class.isAssignableFrom(fragmenterClass)){ String inputFormatName = partData.storageDesc.getInputFormat(); String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib();