HIVE-9845 : HCatSplit repeats information making input split data size huge 
(Mithun Radhakrishnan via Sushanth Sowmyan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/18fb4601
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/18fb4601
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/18fb4601

Branch: refs/heads/beeline-cli
Commit: 18fb460179ff48d2c1e65f324799b4315616f14b
Parents: dc72c87
Author: Sushanth Sowmyan <khorg...@gmail.com>
Authored: Wed May 6 14:03:37 2015 -0700
Committer: Sushanth Sowmyan <khorg...@gmail.com>
Committed: Wed May 6 14:04:32 2015 -0700

----------------------------------------------------------------------
 .../hcatalog/mapreduce/HCatBaseInputFormat.java |  20 ++--
 .../hive/hcatalog/mapreduce/HCatSplit.java      |  21 +---
 .../hive/hcatalog/mapreduce/HCatTableInfo.java  |  12 ++
 .../hive/hcatalog/mapreduce/InputJobInfo.java   |   5 +
 .../hive/hcatalog/mapreduce/PartInfo.java       | 117 +++++++++++++++++--
 .../mapreduce/TestHCatOutputFormat.java         |   5 +-
 6 files changed, 139 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
index 55b97dd..adfaf4e 100644
--- 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+++ 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
@@ -130,16 +130,6 @@ public abstract class HCatBaseInputFormat
       setInputPath(jobConf, partitionInfo.getLocation());
       Map<String, String> jobProperties = partitionInfo.getJobProperties();
 
-      HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
-      for (HCatFieldSchema field :
-        inputJobInfo.getTableInfo().getDataColumns().getFields()) {
-        allCols.append(field);
-      }
-      for (HCatFieldSchema field :
-        inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
-        allCols.append(field);
-      }
-
       HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
 
       storageHandler = HCatUtil.getStorageHandler(
@@ -163,9 +153,7 @@ public abstract class HCatBaseInputFormat
         inputFormat.getSplits(jobConf, desiredNumSplits);
 
       for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
-        splits.add(new HCatSplit(
-          partitionInfo,
-          split, allCols));
+        splits.add(new HCatSplit(partitionInfo, split));
       }
     }
 
@@ -190,6 +178,12 @@ public abstract class HCatBaseInputFormat
 
     HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
     PartInfo partitionInfo = hcatSplit.getPartitionInfo();
+    // Ensure PartInfo's TableInfo is initialized.
+    if (partitionInfo.getTableInfo() == null) {
+      partitionInfo.setTableInfo(((InputJobInfo)HCatUtil.deserialize(
+          taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)
+      )).getTableInfo());
+    }
     JobContext jobContext = taskContext;
     Configuration conf = jobContext.getConfiguration();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java
index bcedb3a..0aa498a 100644
--- 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java
+++ 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 
 import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -44,11 +43,6 @@ public class HCatSplit extends InputSplit
   /** The split returned by the underlying InputFormat split. */
   private org.apache.hadoop.mapred.InputSplit baseMapRedSplit;
 
-  /** The schema for the HCatTable */
-  private HCatSchema tableSchema;
-
-  private HiveConf hiveConf;
-
   /**
    * Instantiates a new hcat split.
    */
@@ -60,16 +54,13 @@ public class HCatSplit extends InputSplit
    *
    * @param partitionInfo the partition info
    * @param baseMapRedSplit the base mapred split
-   * @param tableSchema the table level schema
    */
   public HCatSplit(PartInfo partitionInfo,
-           org.apache.hadoop.mapred.InputSplit baseMapRedSplit,
-           HCatSchema tableSchema) {
+           org.apache.hadoop.mapred.InputSplit baseMapRedSplit) {
 
     this.partitionInfo = partitionInfo;
     // dataSchema can be obtained from partitionInfo.getPartitionSchema()
     this.baseMapRedSplit = baseMapRedSplit;
-    this.tableSchema = tableSchema;
   }
 
   /**
@@ -101,7 +92,8 @@ public class HCatSplit extends InputSplit
    * @return the table schema
    */
   public HCatSchema getTableSchema() {
-    return this.tableSchema;
+    assert this.partitionInfo.getTableInfo() != null : "TableInfo should have 
been set at this point.";
+    return this.partitionInfo.getTableInfo().getAllColumns();
   }
 
   /* (non-Javadoc)
@@ -159,9 +151,6 @@ public class HCatSplit extends InputSplit
     } catch (Exception e) {
       throw new IOException("Exception from " + baseSplitClassName, e);
     }
-
-    String tableSchemaString = WritableUtils.readString(input);
-    tableSchema = (HCatSchema) HCatUtil.deserialize(tableSchemaString);
   }
 
   /* (non-Javadoc)
@@ -178,10 +167,6 @@ public class HCatSplit extends InputSplit
     Writable baseSplitWritable = (Writable) baseMapRedSplit;
     //write  baseSplit into output
     baseSplitWritable.write(output);
-
-    //write the table schema into output
-    String tableSchemaString = HCatUtil.serialize(tableSchema);
-    WritableUtils.writeString(output, tableSchemaString);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java
 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java
index 13faf15..14c93ab 100644
--- 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java
+++ 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java
@@ -21,10 +21,13 @@ package org.apache.hive.hcatalog.mapreduce;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hive.hcatalog.data.schema.HCatSchema;
 
 /**
@@ -112,6 +115,15 @@ public class HCatTableInfo implements Serializable {
   }
 
   /**
+   * @return HCatSchema with all columns (i.e. data and partition columns).
+   */
+  public HCatSchema getAllColumns() {
+    List<HCatFieldSchema> allColumns = 
Lists.newArrayList(dataColumns.getFields());
+    allColumns.addAll(partitionColumns.getFields());
+    return new HCatSchema(allColumns);
+  }
+
+  /**
    * @return the storerInfo
    */
   public StorerInfo getStorerInfo() {

http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
index 360e77b..1f23f3f 100644
--- 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
+++ 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
@@ -182,5 +182,10 @@ public class InputJobInfo implements Serializable {
     ObjectInputStream partInfoReader =
       new ObjectInputStream(new InflaterInputStream(ois));
     partitions = (List<PartInfo>)partInfoReader.readObject();
+    for (PartInfo partInfo : partitions) {
+      if (partInfo.getTableInfo() == null) {
+        partInfo.setTableInfo(this.tableInfo);
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java
index 651a9a0..fca0a92 100644
--- 
a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java
+++ 
b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java
@@ -18,27 +18,32 @@
  */
 package org.apache.hive.hcatalog.mapreduce;
 
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** The Class used to serialize the partition information read from the 
metadata server that maps to a partition. */
 public class PartInfo implements Serializable {
 
+  private static Logger LOG = LoggerFactory.getLogger(PartInfo.class);
   /** The serialization version */
   private static final long serialVersionUID = 1L;
 
-  /** The partition schema. */
-  private final HCatSchema partitionSchema;
+  /** The partition data-schema. */
+  private HCatSchema partitionSchema;
 
   /** The information about which input storage handler to use */
-  private final String storageHandlerClassName;
-  private final String inputFormatClassName;
-  private final String outputFormatClassName;
-  private final String serdeClassName;
+  private String storageHandlerClassName;
+  private String inputFormatClassName;
+  private String outputFormatClassName;
+  private String serdeClassName;
 
   /** HCat-specific properties set at the partition */
   private final Properties hcatProperties;
@@ -52,8 +57,11 @@ public class PartInfo implements Serializable {
   /** Job properties associated with this parition */
   Map<String, String> jobProperties;
 
-  /** the table info associated with this partition */
-  HCatTableInfo tableInfo;
+  /**
+   * The table info associated with this partition.
+   * Not serialized per PartInfo instance. Constant, per table.
+   */
+  transient HCatTableInfo tableInfo;
 
   /**
    * Instantiates a new hcat partition info.
@@ -162,4 +170,97 @@ public class PartInfo implements Serializable {
   public HCatTableInfo getTableInfo() {
     return tableInfo;
   }
+
+  void setTableInfo(HCatTableInfo thatTableInfo) {
+    this.tableInfo = thatTableInfo;
+
+    if (partitionSchema == null) {
+      partitionSchema = tableInfo.getDataColumns();
+    }
+
+    if (storageHandlerClassName == null) {
+      storageHandlerClassName = 
tableInfo.getStorerInfo().getStorageHandlerClass();
+    }
+
+    if (inputFormatClassName == null) {
+      inputFormatClassName = tableInfo.getStorerInfo().getIfClass();
+    }
+
+    if (outputFormatClassName == null) {
+      outputFormatClassName = tableInfo.getStorerInfo().getOfClass();
+    }
+
+    if (serdeClassName == null) {
+      serdeClassName = tableInfo.getStorerInfo().getSerdeClass();
+    }
+  }
+
+  /**
+   * Serialization method. Suppresses serialization of redundant information 
that's already
+   * available from TableInfo.
+   */
+  private void writeObject(ObjectOutputStream oos)
+      throws IOException {
+    // Suppress commonality with TableInfo.
+
+    assert tableInfo != null : "TableInfo can't be null at this point.";
+
+    if (partitionSchema != null) {
+      if (partitionSchema.equals(tableInfo.getDataColumns())) {
+        partitionSchema = null;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Can't suppress data-schema. Partition-schema and 
table-schema seem to differ! "
+              + " partitionSchema: " + partitionSchema.getFields()
+              + " tableSchema: " + tableInfo.getDataColumns());
+        }
+      }
+    }
+
+    if (storageHandlerClassName != null) {
+      if 
(storageHandlerClassName.equals(tableInfo.getStorerInfo().getStorageHandlerClass()))
 {
+        storageHandlerClassName = null;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Partition's storageHandler (" + storageHandlerClassName + 
") " +
+              "differs from table's storageHandler (" + 
tableInfo.getStorerInfo().getStorageHandlerClass() + ").");
+        }
+      }
+    }
+
+    if (inputFormatClassName != null) {
+      if (inputFormatClassName.equals(tableInfo.getStorerInfo().getIfClass())) 
{
+        inputFormatClassName = null;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Partition's InputFormat (" + inputFormatClassName + ") " +
+              "differs from table's InputFormat (" + 
tableInfo.getStorerInfo().getIfClass() + ").");
+        }
+      }
+    }
+
+    if (outputFormatClassName != null) {
+      if 
(outputFormatClassName.equals(tableInfo.getStorerInfo().getOfClass())) {
+        outputFormatClassName = null;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Partition's OutputFormat (" + outputFormatClassName + ") 
" +
+              "differs from table's OutputFormat (" + 
tableInfo.getStorerInfo().getOfClass() + ").");
+        }
+      }
+    }
+
+    if (serdeClassName != null) {
+      if (serdeClassName.equals(tableInfo.getStorerInfo().getSerdeClass())) {
+        serdeClassName = null;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Partition's SerDe (" + serdeClassName + ") " +
+              "differs from table's SerDe (" + 
tableInfo.getStorerInfo().getSerdeClass() + ").");
+        }
+      }
+    }
+
+    oos.defaultWriteObject();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
index add9d41..f716da9 100644
--- 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
+++ 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Lists;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
@@ -106,7 +107,7 @@ public class TestHCatOutputFormat extends TestCase {
     tbl.setDbName(dbName);
     tbl.setTableName(tblName);
     StorageDescriptor sd = new StorageDescriptor();
-    sd.setCols(fields);
+    sd.setCols(Lists.newArrayList(new FieldSchema("data_column", 
serdeConstants.STRING_TYPE_NAME, "")));
     tbl.setSd(sd);
 
     //sd.setLocation("hdfs://tmp");
@@ -151,7 +152,7 @@ public class TestHCatOutputFormat extends TestCase {
     assertEquals(1, jobInfo.getPartitionValues().size());
     assertEquals("p1", jobInfo.getPartitionValues().get("colname"));
     assertEquals(1, 
jobInfo.getTableInfo().getDataColumns().getFields().size());
-    assertEquals("colname", 
jobInfo.getTableInfo().getDataColumns().getFields().get(0).getName());
+    assertEquals("data_column", 
jobInfo.getTableInfo().getDataColumns().getFields().get(0).getName());
 
     publishTest(job);
   }

Reply via email to