Repository: spark Updated Branches: refs/heads/master 0d8cdf0ed -> 5b922bb45
[SPARK-3543] Clean up Java TaskContext implementation. This addresses some minor issues in https://github.com/apache/spark/pull/2425 Author: Reynold Xin <r...@apache.org> Closes #2557 from rxin/TaskContext and squashes the following commits: a51e5f6 [Reynold Xin] [SPARK-3543] Clean up Java TaskContext implementation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b922bb4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b922bb4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b922bb4 Branch: refs/heads/master Commit: 5b922bb458e863f5be0ae68167de882743f70b86 Parents: 0d8cdf0 Author: Reynold Xin <r...@apache.org> Authored: Sat Sep 27 14:46:00 2014 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Sat Sep 27 14:46:00 2014 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/spark/TaskContext.java | 33 +++++++++----------- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../org/apache/spark/scheduler/ResultTask.scala | 6 +--- .../apache/spark/scheduler/ShuffleMapTask.scala | 2 -- .../scala/org/apache/spark/scheduler/Task.scala | 8 +++-- 5 files changed, 22 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5b922bb4/core/src/main/java/org/apache/spark/TaskContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/TaskContext.java b/core/src/main/java/org/apache/spark/TaskContext.java index 09b8ce0..4e6d708 100644 --- a/core/src/main/java/org/apache/spark/TaskContext.java +++ b/core/src/main/java/org/apache/spark/TaskContext.java @@ -56,7 +56,7 @@ public class TaskContext implements Serializable { * @param taskMetrics performance metrics of the task */ @DeveloperApi - public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally, TaskMetrics taskMetrics) { this.attemptId = attemptId; this.partitionId = partitionId; @@ -65,7 +65,6 @@ public class TaskContext implements Serializable { this.taskMetrics = taskMetrics; } - /** * :: DeveloperApi :: * Contextual information about a task which can be read or mutated during execution. @@ -76,8 +75,7 @@ public class TaskContext implements Serializable { * @param runningLocally whether the task is running locally in the driver JVM */ @DeveloperApi - public TaskContext(Integer stageId, Integer partitionId, Long attemptId, - Boolean runningLocally) { + public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally) { this.attemptId = attemptId; this.partitionId = partitionId; this.runningLocally = runningLocally; @@ -85,7 +83,6 @@ public class TaskContext implements Serializable { this.taskMetrics = TaskMetrics.empty(); } - /** * :: DeveloperApi :: * Contextual information about a task which can be read or mutated during execution. @@ -95,7 +92,7 @@ public class TaskContext implements Serializable { * @param attemptId the number of attempts to execute this task */ @DeveloperApi - public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { + public TaskContext(int stageId, int partitionId, long attemptId) { this.attemptId = attemptId; this.partitionId = partitionId; this.runningLocally = false; @@ -107,9 +104,9 @@ public class TaskContext implements Serializable { new ThreadLocal<TaskContext>(); /** - * :: Internal API :: - * This is spark internal API, not intended to be called from user programs. - */ + * :: Internal API :: + * This is spark internal API, not intended to be called from user programs. + */ public static void setTaskContext(TaskContext tc) { taskContext.set(tc); } @@ -118,10 +115,8 @@ public class TaskContext implements Serializable { return taskContext.get(); } - /** - * :: Internal API :: - */ - public static void remove() { + /** :: Internal API :: */ + public static void unset() { taskContext.remove(); } @@ -130,22 +125,22 @@ public class TaskContext implements Serializable { new ArrayList<TaskCompletionListener>(); // Whether the corresponding task has been killed. - private volatile Boolean interrupted = false; + private volatile boolean interrupted = false; // Whether the task has completed. - private volatile Boolean completed = false; + private volatile boolean completed = false; /** * Checks whether the task has completed. */ - public Boolean isCompleted() { + public boolean isCompleted() { return completed; } /** * Checks whether the task has been killed. */ - public Boolean isInterrupted() { + public boolean isInterrupted() { return interrupted; } @@ -246,12 +241,12 @@ public class TaskContext implements Serializable { } @Deprecated - /** Deprecated: use getRunningLocally() */ + /** Deprecated: use isRunningLocally() */ public boolean runningLocally() { return runningLocally; } - public boolean getRunningLocally() { + public boolean isRunningLocally() { return runningLocally; } http://git-wip-us.apache.org/repos/asf/spark/blob/5b922bb4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 32cf29e..70c235d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -641,7 +641,7 @@ class DAGScheduler( job.listener.taskSucceeded(0, result) } finally { taskContext.markTaskCompleted() - TaskContext.remove() + TaskContext.unset() } } catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/spark/blob/5b922bb4/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 2ccbd8e..4a9ff91 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -58,11 +58,7 @@ private[spark] class ResultTask[T, U]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) metrics = Some(context.taskMetrics) - try { - func(context, rdd.iterator(partition, context)) - } finally { - context.markTaskCompleted() - } + func(context, rdd.iterator(partition, context)) } // This is only callable on the driver side. http://git-wip-us.apache.org/repos/asf/spark/blob/5b922bb4/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index a98ee11..7970908 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -78,8 +78,6 @@ private[spark] class ShuffleMapTask( log.debug("Could not stop writer", e) } throw e - } finally { - context.markTaskCompleted() } } http://git-wip-us.apache.org/repos/asf/spark/blob/5b922bb4/core/src/main/scala/org/apache/spark/scheduler/Task.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index bf73f6f..c6e47c8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -52,7 +52,12 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex if (_killed) { kill(interruptThread = false) } - runTask(context) + try { + runTask(context) + } finally { + context.markTaskCompleted() + TaskContext.unset() + } } def runTask(context: TaskContext): T @@ -93,7 +98,6 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex if (interruptThread && taskThread != null) { taskThread.interrupt() } - TaskContext.remove() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org