Author: xuefu
Date: Tue Dec 30 20:56:26 2014
New Revision: 1648598

URL: http://svn.apache.org/r1648598
Log:
HIVE-8920: IOContext problem with multiple MapWorks cloned for multi-insert 
[Spark Branch]

Modified:
    hive/branches/spark/itests/src/test/resources/testconfiguration.properties
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java

Modified: 
hive/branches/spark/itests/src/test/resources/testconfiguration.properties
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/itests/src/test/resources/testconfiguration.properties?rev=1648598&r1=1648597&r2=1648598&view=diff
==============================================================================
--- hive/branches/spark/itests/src/test/resources/testconfiguration.properties 
(original)
+++ hive/branches/spark/itests/src/test/resources/testconfiguration.properties 
Tue Dec 30 20:56:26 2014
@@ -741,6 +741,7 @@ spark.query.files=add_part_multiple.q, \
   multi_insert_mixed.q, \
   multi_insert_move_tasks_share_dependencies.q, \
   multi_join_union.q, \
+  multi_join_union_src.q, \
   multigroupby_singlemr.q, \
   optimize_nullscan.q, \
   order.q, \

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1648598&r1=1648597&r2=1648598&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
 Tue Dec 30 20:56:26 2014
@@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
-import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
@@ -60,8 +59,6 @@ public class SparkMapRecordHandler exten
   private static final String PLAN_KEY = "__MAP_PLAN__";
   private MapOperator mo;
   public static final Log l4j = LogFactory.getLog(SparkMapRecordHandler.class);
-  private boolean done;
-
   private MapredLocalWork localWork = null;
   private boolean isLogInfoEnabled = false;
   private ExecMapperContext execContext;
@@ -91,13 +88,6 @@ public class SparkMapRecordHandler exten
       }
       mo.setConf(mrwork);
 
-      // If the current thread's IOContext is not initialized (because it's 
reading from a
-      // cached input HadoopRDD), copy from the saved result.
-      IOContext ioContext = IOContext.get();
-      if (ioContext.getInputPath() == null) {
-        IOContext.copy(ioContext, 
IOContext.getMap().get(SparkUtilities.MAP_IO_CONTEXT));
-      }
-
       // initialize map operator
       mo.setChildren(job);
       l4j.info(mo.dump(0));
@@ -211,10 +201,6 @@ public class SparkMapRecordHandler exten
     } finally {
       MapredContext.close();
       Utilities.clearWorkMap();
-
-      // It's possible that a thread get reused for different queries, so we 
need to
-      // reset the input path.
-      IOContext.get().setInputPath(null);
     }
   }
 

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1648598&r1=1648597&r2=1648598&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
 Tue Dec 30 20:56:26 2014
@@ -175,7 +175,8 @@ public class SparkPlanGenerator {
 
     JavaPairRDD<WritableComparable, Writable> hadoopRDD = 
sc.hadoopRDD(jobConf, ifClass,
         WritableComparable.class, Writable.class);
-    MapInput result = new MapInput(sparkPlan, hadoopRDD, 
cloneToWork.containsKey(mapWork));
+    // Caching is disabled for MapInput due to HIVE-8920
+    MapInput result = new MapInput(sparkPlan, hadoopRDD, 
false/*cloneToWork.containsKey(mapWork)*/);
     return result;
   }
 

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1648598&r1=1648597&r2=1648598&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
 Tue Dec 30 20:56:26 2014
@@ -36,9 +36,6 @@ import org.apache.hadoop.io.BytesWritabl
  */
 public class SparkUtilities {
 
-  // Used to save and retrieve IOContext for multi-insertion.
-  public static final String MAP_IO_CONTEXT = "MAP_IO_CONTEXT";
-
   public static HiveKey copyHiveKey(HiveKey key) {
     HiveKey copy = new HiveKey();
     copy.setDistKeyLength(key.getDistKeyLength());

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1648598&r1=1648597&r2=1648598&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
 Tue Dec 30 20:56:26 2014
@@ -27,11 +27,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.FooterBuffer;
-import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.apache.hadoop.hive.ql.io.IOContext.Comparison;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -173,18 +171,6 @@ public abstract class HiveContextAwareRe
     ioCxtRef.isBlockPointer = isBlockPointer;
     ioCxtRef.inputPath = inputPath;
     LOG.info("Processing file " + inputPath);
-
-    // In spark, in multi-insert an input HadoopRDD maybe be shared by multiple
-    // mappers, and if we cache it, only the first thread will have its 
thread-local
-    // IOContext initialized, while the rest will not.
-    // To solve this issue, we need to save a copy of the initialized 
IOContext, so that
-    // later it can be used for other threads.
-    if (HiveConf.getVar(jobConf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
-      IOContext iocontext = new IOContext();
-      IOContext.copy(iocontext, ioCxtRef);
-      IOContext.getMap().put(SparkUtilities.MAP_IO_CONTEXT, iocontext);
-    }
-
     initDone = true;
   }
 

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1648598&r1=1648597&r2=1648598&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java 
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java 
Tue Dec 30 20:56:26 2014
@@ -25,9 +25,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
-import org.apache.hadoop.hive.ql.session.SessionState;
-
 
 /**
  * IOContext basically contains the position information of the current
@@ -45,11 +42,8 @@ public class IOContext {
  };
 
   private static Map<String, IOContext> inputNameIOContextMap = new 
HashMap<String, IOContext>();
-  public static Map<String, IOContext> getMap() {
-    return inputNameIOContextMap;
-  }
 
-  public static IOContext get() {
+  private static IOContext get() {
     return IOContext.threadLocal.get();
   }
 
@@ -109,27 +103,6 @@ public class IOContext {
     this.ioExceptions = false;
   }
 
-  /**
-   * Copy all fields values from orig to dest, all existing fields in dest 
will be overwritten.
-   *
-   * @param dest the IOContext to copy to
-   * @param orig the IOContext to copy from
-   */
-  public static void copy(IOContext dest, IOContext orig) {
-    dest.currentBlockStart = orig.currentBlockStart;
-    dest.nextBlockStart = orig.nextBlockStart;
-    dest.currentRow = orig.currentRow;
-    dest.isBlockPointer = orig.isBlockPointer;
-    dest.ioExceptions = orig.ioExceptions;
-    dest.useSorted = orig.useSorted;
-    dest.isBinarySearching = orig.isBinarySearching;
-    dest.endBinarySearch = orig.endBinarySearch;
-    dest.comparison = orig.comparison;
-    dest.genericUDFClassName = orig.genericUDFClassName;
-    dest.ri = orig.ri;
-    dest.inputPath = orig.inputPath;
-  }
-
   public long getCurrentBlockStart() {
     return currentBlockStart;
   }


Reply via email to