Repository: spark
Updated Branches:
  refs/heads/branch-1.5 5491dfb9a -> e902c4f26


[SPARK-8057][Core]Call TaskAttemptContext.getTaskAttemptID using Reflection

Someone may use the Spark core jar in the maven repo with hadoop 1. SPARK-2075 
has already resolved the compatibility issue to support it. But 
`SparkHadoopMapRedUtil.commitTask` broke it recently.

This PR uses Reflection to call `TaskAttemptContext.getTaskAttemptID` to fix 
the compatibility issue.

Author: zsxwing <zsxw...@gmail.com>

Closes #6599 from zsxwing/SPARK-8057 and squashes the following commits:

f7a343c [zsxwing] Remove the redundant import
6b7f1af [zsxwing] Call TaskAttemptContext.getTaskAttemptID using Reflection

(cherry picked from commit 672f467668da1cf20895ee57652489c306120288)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e902c4f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e902c4f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e902c4f2

Branch: refs/heads/branch-1.5
Commit: e902c4f26fdaa505ea2c57c701b72b1f5d2d8b70
Parents: 5491dfb
Author: zsxwing <zsxw...@gmail.com>
Authored: Thu Aug 6 21:42:42 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Aug 6 21:42:55 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkHadoopUtil.scala     | 14 ++++++++++++++
 .../apache/spark/mapred/SparkHadoopMapRedUtil.scala   |  3 ++-
 2 files changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e902c4f2/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index e06b06e..7e9dba4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -34,6 +34,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, 
PathFilter}
 import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.{TaskAttemptContext => 
MapReduceTaskAttemptContext}
+import org.apache.hadoop.mapreduce.{TaskAttemptID => MapReduceTaskAttemptID}
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.annotation.DeveloperApi
@@ -195,6 +197,18 @@ class SparkHadoopUtil extends Logging {
   }
 
   /**
+   * Using reflection to call `getTaskAttemptID` from TaskAttemptContext. If 
we directly
+   * call `TaskAttemptContext.getTaskAttemptID`, it will generate different 
byte codes
+   * for Hadoop 1.+ and Hadoop 2.+ because TaskAttemptContext is class in 
Hadoop 1.+
+   * while it's interface in Hadoop 2.+.
+   */
+  def getTaskAttemptIDFromTaskAttemptContext(
+      context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
+    val method = context.getClass.getMethod("getTaskAttemptID")
+    method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
+  }
+
+  /**
    * Get [[FileStatus]] objects for all leaf children (files) under the given 
base path. If the
    * given path points to a file, return a single-element collection 
containing [[FileStatus]] of
    * that file.

http://git-wip-us.apache.org/repos/asf/spark/blob/e902c4f2/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala 
b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 87df427..f405b73 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapred._
 import org.apache.hadoop.mapreduce.{TaskAttemptContext => 
MapReduceTaskAttemptContext}
 import org.apache.hadoop.mapreduce.{OutputCommitter => 
MapReduceOutputCommitter}
 
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.CommitDeniedException
 import org.apache.spark.{Logging, SparkEnv, TaskContext}
 import org.apache.spark.util.{Utils => SparkUtils}
@@ -93,7 +94,7 @@ object SparkHadoopMapRedUtil extends Logging {
       splitId: Int,
       attemptId: Int): Unit = {
 
-    val mrTaskAttemptID = mrTaskContext.getTaskAttemptID
+    val mrTaskAttemptID = 
SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
 
     // Called after we have decided to commit
     def performCommit(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to