HIVE-9845 : HCatSplit repeats information making input split data size huge (Mithun Radhakrishnan via Sushanth Sowmyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/18fb4601 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/18fb4601 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/18fb4601 Branch: refs/heads/beeline-cli Commit: 18fb460179ff48d2c1e65f324799b4315616f14b Parents: dc72c87 Author: Sushanth Sowmyan <khorg...@gmail.com> Authored: Wed May 6 14:03:37 2015 -0700 Committer: Sushanth Sowmyan <khorg...@gmail.com> Committed: Wed May 6 14:04:32 2015 -0700 ---------------------------------------------------------------------- .../hcatalog/mapreduce/HCatBaseInputFormat.java | 20 ++-- .../hive/hcatalog/mapreduce/HCatSplit.java | 21 +--- .../hive/hcatalog/mapreduce/HCatTableInfo.java | 12 ++ .../hive/hcatalog/mapreduce/InputJobInfo.java | 5 + .../hive/hcatalog/mapreduce/PartInfo.java | 117 +++++++++++++++++-- .../mapreduce/TestHCatOutputFormat.java | 5 +- 6 files changed, 139 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java index 55b97dd..adfaf4e 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java @@ -130,16 +130,6 @@ public abstract class HCatBaseInputFormat setInputPath(jobConf, partitionInfo.getLocation()); Map<String, String> jobProperties = partitionInfo.getJobProperties(); - HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>()); - for (HCatFieldSchema field : - inputJobInfo.getTableInfo().getDataColumns().getFields()) { - allCols.append(field); - } - for (HCatFieldSchema field : - inputJobInfo.getTableInfo().getPartitionColumns().getFields()) { - allCols.append(field); - } - HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf); storageHandler = HCatUtil.getStorageHandler( @@ -163,9 +153,7 @@ public abstract class HCatBaseInputFormat inputFormat.getSplits(jobConf, desiredNumSplits); for (org.apache.hadoop.mapred.InputSplit split : baseSplits) { - splits.add(new HCatSplit( - partitionInfo, - split, allCols)); + splits.add(new HCatSplit(partitionInfo, split)); } } @@ -190,6 +178,12 @@ public abstract class HCatBaseInputFormat HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split); PartInfo partitionInfo = hcatSplit.getPartitionInfo(); + // Ensure PartInfo's TableInfo is initialized. + if (partitionInfo.getTableInfo() == null) { + partitionInfo.setTableInfo(((InputJobInfo)HCatUtil.deserialize( + taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO) + )).getTableInfo()); + } JobContext jobContext = taskContext; Configuration conf = jobContext.getConfiguration(); http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java index bcedb3a..0aa498a 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.lang.reflect.Constructor; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.InputSplit; @@ -44,11 +43,6 @@ public class HCatSplit extends InputSplit /** The split returned by the underlying InputFormat split. */ private org.apache.hadoop.mapred.InputSplit baseMapRedSplit; - /** The schema for the HCatTable */ - private HCatSchema tableSchema; - - private HiveConf hiveConf; - /** * Instantiates a new hcat split. */ @@ -60,16 +54,13 @@ public class HCatSplit extends InputSplit * * @param partitionInfo the partition info * @param baseMapRedSplit the base mapred split - * @param tableSchema the table level schema */ public HCatSplit(PartInfo partitionInfo, - org.apache.hadoop.mapred.InputSplit baseMapRedSplit, - HCatSchema tableSchema) { + org.apache.hadoop.mapred.InputSplit baseMapRedSplit) { this.partitionInfo = partitionInfo; // dataSchema can be obtained from partitionInfo.getPartitionSchema() this.baseMapRedSplit = baseMapRedSplit; - this.tableSchema = tableSchema; } /** @@ -101,7 +92,8 @@ public class HCatSplit extends InputSplit * @return the table schema */ public HCatSchema getTableSchema() { - return this.tableSchema; + assert this.partitionInfo.getTableInfo() != null : "TableInfo should have been set at this point."; + return this.partitionInfo.getTableInfo().getAllColumns(); } /* (non-Javadoc) @@ -159,9 +151,6 @@ public class HCatSplit extends InputSplit } catch (Exception e) { throw new IOException("Exception from " + baseSplitClassName, e); } - - String tableSchemaString = WritableUtils.readString(input); - tableSchema = (HCatSchema) HCatUtil.deserialize(tableSchemaString); } /* (non-Javadoc) @@ -178,10 +167,6 @@ public class HCatSplit extends InputSplit Writable baseSplitWritable = (Writable) baseMapRedSplit; //write baseSplit into output baseSplitWritable.write(output); - - //write the table schema into output - String tableSchemaString = HCatUtil.serialize(tableSchema); - WritableUtils.writeString(output, tableSchemaString); } } http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java index 13faf15..14c93ab 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java @@ -21,10 +21,13 @@ package org.apache.hive.hcatalog.mapreduce; import java.io.IOException; import java.io.Serializable; +import java.util.List; +import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; /** @@ -112,6 +115,15 @@ public class HCatTableInfo implements Serializable { } /** + * @return HCatSchema with all columns (i.e. data and partition columns). + */ + public HCatSchema getAllColumns() { + List<HCatFieldSchema> allColumns = Lists.newArrayList(dataColumns.getFields()); + allColumns.addAll(partitionColumns.getFields()); + return new HCatSchema(allColumns); + } + + /** * @return the storerInfo */ public StorerInfo getStorerInfo() { http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java index 360e77b..1f23f3f 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java @@ -182,5 +182,10 @@ public class InputJobInfo implements Serializable { ObjectInputStream partInfoReader = new ObjectInputStream(new InflaterInputStream(ois)); partitions = (List<PartInfo>)partInfoReader.readObject(); + for (PartInfo partInfo : partitions) { + if (partInfo.getTableInfo() == null) { + partInfo.setTableInfo(this.tableInfo); + } + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java index 651a9a0..fca0a92 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java @@ -18,27 +18,32 @@ */ package org.apache.hive.hcatalog.mapreduce; +import java.io.IOException; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Map; import java.util.Properties; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** The Class used to serialize the partition information read from the metadata server that maps to a partition. */ public class PartInfo implements Serializable { + private static Logger LOG = LoggerFactory.getLogger(PartInfo.class); /** The serialization version */ private static final long serialVersionUID = 1L; - /** The partition schema. */ - private final HCatSchema partitionSchema; + /** The partition data-schema. */ + private HCatSchema partitionSchema; /** The information about which input storage handler to use */ - private final String storageHandlerClassName; - private final String inputFormatClassName; - private final String outputFormatClassName; - private final String serdeClassName; + private String storageHandlerClassName; + private String inputFormatClassName; + private String outputFormatClassName; + private String serdeClassName; /** HCat-specific properties set at the partition */ private final Properties hcatProperties; @@ -52,8 +57,11 @@ public class PartInfo implements Serializable { /** Job properties associated with this parition */ Map<String, String> jobProperties; - /** the table info associated with this partition */ - HCatTableInfo tableInfo; + /** + * The table info associated with this partition. + * Not serialized per PartInfo instance. Constant, per table. + */ + transient HCatTableInfo tableInfo; /** * Instantiates a new hcat partition info. @@ -162,4 +170,97 @@ public class PartInfo implements Serializable { public HCatTableInfo getTableInfo() { return tableInfo; } + + void setTableInfo(HCatTableInfo thatTableInfo) { + this.tableInfo = thatTableInfo; + + if (partitionSchema == null) { + partitionSchema = tableInfo.getDataColumns(); + } + + if (storageHandlerClassName == null) { + storageHandlerClassName = tableInfo.getStorerInfo().getStorageHandlerClass(); + } + + if (inputFormatClassName == null) { + inputFormatClassName = tableInfo.getStorerInfo().getIfClass(); + } + + if (outputFormatClassName == null) { + outputFormatClassName = tableInfo.getStorerInfo().getOfClass(); + } + + if (serdeClassName == null) { + serdeClassName = tableInfo.getStorerInfo().getSerdeClass(); + } + } + + /** + * Serialization method. Suppresses serialization of redundant information that's already + * available from TableInfo. + */ + private void writeObject(ObjectOutputStream oos) + throws IOException { + // Suppress commonality with TableInfo. + + assert tableInfo != null : "TableInfo can't be null at this point."; + + if (partitionSchema != null) { + if (partitionSchema.equals(tableInfo.getDataColumns())) { + partitionSchema = null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Can't suppress data-schema. Partition-schema and table-schema seem to differ! " + + " partitionSchema: " + partitionSchema.getFields() + + " tableSchema: " + tableInfo.getDataColumns()); + } + } + } + + if (storageHandlerClassName != null) { + if (storageHandlerClassName.equals(tableInfo.getStorerInfo().getStorageHandlerClass())) { + storageHandlerClassName = null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Partition's storageHandler (" + storageHandlerClassName + ") " + + "differs from table's storageHandler (" + tableInfo.getStorerInfo().getStorageHandlerClass() + ")."); + } + } + } + + if (inputFormatClassName != null) { + if (inputFormatClassName.equals(tableInfo.getStorerInfo().getIfClass())) { + inputFormatClassName = null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Partition's InputFormat (" + inputFormatClassName + ") " + + "differs from table's InputFormat (" + tableInfo.getStorerInfo().getIfClass() + ")."); + } + } + } + + if (outputFormatClassName != null) { + if (outputFormatClassName.equals(tableInfo.getStorerInfo().getOfClass())) { + outputFormatClassName = null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Partition's OutputFormat (" + outputFormatClassName + ") " + + "differs from table's OutputFormat (" + tableInfo.getStorerInfo().getOfClass() + ")."); + } + } + } + + if (serdeClassName != null) { + if (serdeClassName.equals(tableInfo.getStorerInfo().getSerdeClass())) { + serdeClassName = null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Partition's SerDe (" + serdeClassName + ") " + + "differs from table's SerDe (" + tableInfo.getStorerInfo().getSerdeClass() + ")."); + } + } + } + + oos.defaultWriteObject(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java index add9d41..f716da9 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.google.common.collect.Lists; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; @@ -106,7 +107,7 @@ public class TestHCatOutputFormat extends TestCase { tbl.setDbName(dbName); tbl.setTableName(tblName); StorageDescriptor sd = new StorageDescriptor(); - sd.setCols(fields); + sd.setCols(Lists.newArrayList(new FieldSchema("data_column", serdeConstants.STRING_TYPE_NAME, ""))); tbl.setSd(sd); //sd.setLocation("hdfs://tmp"); @@ -151,7 +152,7 @@ public class TestHCatOutputFormat extends TestCase { assertEquals(1, jobInfo.getPartitionValues().size()); assertEquals("p1", jobInfo.getPartitionValues().get("colname")); assertEquals(1, jobInfo.getTableInfo().getDataColumns().getFields().size()); - assertEquals("colname", jobInfo.getTableInfo().getDataColumns().getFields().get(0).getName()); + assertEquals("data_column", jobInfo.getTableInfo().getDataColumns().getFields().get(0).getName()); publishTest(job); }