[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r18121901
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private int stageId;
+  private int partitionId;
+  private long attemptId;
+  private boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+  private static ThreadLocal taskContext =
+new ThreadLocal();
+
+  /**
+  * :: Internal API ::
+  * This is spark internal API, not intended to be called from user 
programs.
+  */
+  public static void setTaskContext(TaskContext tc) {
+taskContext.set(tc);
+  }
+
+  public static TaskContext get() {
+return taskContext.get();
+  }
+
+  /** 
+  * :: Internal API ::
+  */
+  public static void remove() {
+taskContext.remove();
+  }
+
+  // List of callback functions to execute when the task completes.
+  private transient List onCompleteCallbacks =
+new ArrayLi

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/2425


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r18121892
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private int stageId;
+  private int partitionId;
+  private long attemptId;
+  private boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
--- End diff --

fyi all these should be primitives rather than boxed 
Integers/Longs/Booleans. I'm going to fix it in a separate pr.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-57042070
  
Ok I'm merging this. I will push some minor cleanups after merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r18121869
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala ---
@@ -45,7 +45,8 @@ import org.apache.spark.util.Utils
 private[spark] abstract class Task[T](val stageId: Int, var partitionId: 
Int) extends Serializable {
 
   final def run(attemptId: Long): T = {
-context = new TaskContext(stageId, partitionId, attemptId, 
runningLocally = false)
+context = new TaskContext(stageId, partitionId, attemptId, false)
--- End diff --

if the parameter is defined in java it cannot be named


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-56952085
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20856/consoleFull)
 for   PR 2425 at commit 
[`8ae414c`](https://github.com/apache/spark/commit/8ae414c1ff2af5328cac7cc36b28e66f3aa6647d).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-56952094
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20856/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r18083933
  
--- Diff: core/src/test/scala/org/apache/spark/CacheManagerSuite.scala ---
@@ -94,7 +94,7 @@ class CacheManagerSuite extends FunSuite with 
BeforeAndAfter with EasyMockSugar
 }
 
 whenExecuting(blockManager) {
-  val context = new TaskContext(0, 0, 0, runningLocally = true)
+  val context = new TaskContext(0, 0, 0, true)
--- End diff --

These were breaking compilations..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-56946632
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20856/consoleFull)
 for   PR 2425 at commit 
[`8ae414c`](https://github.com/apache/spark/commit/8ae414c1ff2af5328cac7cc36b28e66f3aa6647d).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-56946056
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20855/consoleFull)
 for   PR 2425 at commit 
[`5f8555b`](https://github.com/apache/spark/commit/5f8555b7742c3818fa38e57dea477cd2a9636711).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-56946058
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20855/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-56945767
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20855/consoleFull)
 for   PR 2425 at commit 
[`5f8555b`](https://github.com/apache/spark/commit/5f8555b7742c3818fa38e57dea477cd2a9636711).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-56944743
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20854/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-56944740
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20854/consoleFull)
 for   PR 2425 at commit 
[`155bc3b`](https://github.com/apache/spark/commit/155bc3b966b7b8960e09375275ef291d23feed53).
 * This patch **fails** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-5698
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20854/consoleFull)
 for   PR 2425 at commit 
[`155bc3b`](https://github.com/apache/spark/commit/155bc3b966b7b8960e09375275ef291d23feed53).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-24 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-56776648
  
@ScrapCodes will you have time to address the feedback?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-20 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-56286065
  
@ScrapCodes made another pass with some comments. Overall this is looking 
good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-20 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17822812
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala ---
@@ -45,7 +45,8 @@ import org.apache.spark.util.Utils
 private[spark] abstract class Task[T](val stageId: Int, var partitionId: 
Int) extends Serializable {
 
   final def run(attemptId: Long): T = {
-context = new TaskContext(stageId, partitionId, attemptId, 
runningLocally = false)
+context = new TaskContext(stageId, partitionId, attemptId, false)
+TaskContext.setTaskContext(context)
--- End diff --

Can we also clear this after the task is run? It might be good to expose a 
static `remove` method in the `TaskContext` class. We should also put a warning 
to users never to call that method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-20 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17822804
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private int stageId;
+  private int partitionId;
+  private long attemptId;
+  private boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+  private static ThreadLocal taskContext =
+new ThreadLocal();
+
+  public static void setTaskContext(TaskContext tc) {
--- End diff --

Can you add a warning in the javadoc that explains this should never be 
called by user programs? It would be nice if this could be `private[spark]` but 
unfortunately we are in java land :P


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-20 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17822777
  
--- Diff: core/src/test/scala/org/apache/spark/CacheManagerSuite.scala ---
@@ -94,7 +94,7 @@ class CacheManagerSuite extends FunSuite with 
BeforeAndAfter with EasyMockSugar
 }
 
 whenExecuting(blockManager) {
-  val context = new TaskContext(0, 0, 0, runningLocally = true)
+  val context = new TaskContext(0, 0, 0, true)
--- End diff --

we should name it here too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-20 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17822775
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala ---
@@ -45,7 +45,8 @@ import org.apache.spark.util.Utils
 private[spark] abstract class Task[T](val stageId: Int, var partitionId: 
Int) extends Serializable {
 
   final def run(attemptId: Long): T = {
-context = new TaskContext(stageId, partitionId, attemptId, 
runningLocally = false)
+context = new TaskContext(stageId, partitionId, attemptId, false)
--- End diff --

any reason to drop the naming for `runningLocally`? the correct style is to 
name all optional paremeters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-20 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17822735
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private int stageId;
+  private int partitionId;
+  private long attemptId;
+  private boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+  private static ThreadLocal taskContext =
+new ThreadLocal();
+
+  public static void setTaskContext(TaskContext tc) {
+taskContext.set(tc);
+  }
+
+  public static TaskContext get() {
+return taskContext.get();
+  }
+
+  // List of callback functions to execute when the task completes.
+  private transient List onCompleteCallbacks =
+new ArrayList();
+
+  // Whether the corresponding task has been killed.
+  private volatile Boolean interrupted = false;
+
+  // Whether the task has completed.
+  private volatile Boolean completed = false;
+
+  /**
+   * Checks whethe

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-20 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17822738
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private int stageId;
+  private int partitionId;
+  private long attemptId;
+  private boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+  private static ThreadLocal taskContext =
+new ThreadLocal();
+
+  public static void setTaskContext(TaskContext tc) {
+taskContext.set(tc);
+  }
+
+  public static TaskContext get() {
+return taskContext.get();
+  }
+
+  // List of callback functions to execute when the task completes.
+  private transient List onCompleteCallbacks =
+new ArrayList();
+
+  // Whether the corresponding task has been killed.
+  private volatile Boolean interrupted = false;
+
+  // Whether the task has completed.
+  private volatile Boolean completed = false;
+
+  /**
+   * Checks whethe

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-20 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17822726
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private int stageId;
+  private int partitionId;
+  private long attemptId;
+  private boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+  private static ThreadLocal taskContext =
+new ThreadLocal();
+
+  public static void setTaskContext(TaskContext tc) {
+taskContext.set(tc);
+  }
+
+  public static TaskContext get() {
+return taskContext.get();
+  }
+
+  // List of callback functions to execute when the task completes.
+  private transient List onCompleteCallbacks =
+new ArrayList();
+
+  // Whether the corresponding task has been killed.
+  private volatile Boolean interrupted = false;
+
+  // Whether the task has completed.
+  private volatile Boolean completed = false;
+
+  /**
+   * Checks whethe

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-20 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17822725
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private int stageId;
+  private int partitionId;
+  private long attemptId;
+  private boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+  private static ThreadLocal taskContext =
+new ThreadLocal();
+
+  public static void setTaskContext(TaskContext tc) {
+taskContext.set(tc);
+  }
+
+  public static TaskContext get() {
+return taskContext.get();
+  }
+
+  // List of callback functions to execute when the task completes.
+  private transient List onCompleteCallbacks =
+new ArrayList();
+
+  // Whether the corresponding task has been killed.
+  private volatile Boolean interrupted = false;
+
+  // Whether the task has completed.
+  private volatile Boolean completed = false;
+
+  /**
+   * Checks whethe

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-20 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17822722
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private int stageId;
+  private int partitionId;
+  private long attemptId;
+  private boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+  }
+
+  private static ThreadLocal taskContext =
+new ThreadLocal();
+
+  public static void setTaskContext(TaskContext tc) {
+taskContext.set(tc);
+  }
+
+  public static TaskContext get() {
+return taskContext.get();
+  }
+
+  // List of callback functions to execute when the task completes.
+  private transient List onCompleteCallbacks =
+new ArrayList();
+
+  // Whether the corresponding task has been killed.
+  private volatile Boolean interrupted = false;
+
+  // Whether the task has completed.
+  private volatile Boolean completed = false;
+
+  /**
+   * Checks whethe

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-56024795
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20536/consoleFull)
 for   PR 2425 at commit 
[`ee8bd00`](https://github.com/apache/spark/commit/ee8bd009fe8065c4ea018c4ff5baa32378606243).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-56018945
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20536/consoleFull)
 for   PR 2425 at commit 
[`ee8bd00`](https://github.com/apache/spark/commit/ee8bd009fe8065c4ea018c4ff5baa32378606243).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17713666
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+  private static ThreadLocal taskContext =
--- End diff --

yes, get must be static. he's only thinking we should move the set to 
somewhere else


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: rev

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17713674
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+  private static ThreadLocal taskContext =
--- End diff --

assuming i'm reading his mind correctly :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For addit

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-18 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17713594
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+  private static ThreadLocal taskContext =
--- End diff --

But then how would someone access it  ? I hope he wants to retain atleast 
the get method inside TaskContext as static. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17713298
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+  private static ThreadLocal taskContext =
+new ThreadLocal();
+
+  public static TaskContext get() {
+return taskContext.get();
+  }
+
+  // List of callback functions to execute when the task completes.
+  private transient List onCompleteCallbacks =
+new ArrayList();
+
+  // Whether the corresponding task has been killed.
+  private volatile Boolean interrupted = false;
+
+  // Whether the task has completed.
+  private volatile Boolean completed = false;
+
+  /**
+   * Checks whether th

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-18 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17713233
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+  private static ThreadLocal taskContext =
+new ThreadLocal();
+
+  public static TaskContext get() {
+return taskContext.get();
+  }
+
+  // List of callback functions to execute when the task completes.
+  private transient List onCompleteCallbacks =
+new ArrayList();
+
+  // Whether the corresponding task has been killed.
+  private volatile Boolean interrupted = false;
+
+  // Whether the task has completed.
+  private volatile Boolean completed = false;
+
+  /**
+   * Checks whet

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-18 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17713126
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+  private static ThreadLocal taskContext =
--- End diff --

alright


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: revie

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17712911
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+  private static ThreadLocal taskContext =
--- End diff --

I think Patrick meant putting it in executor or in Task's run method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-18 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17712697
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+  private static ThreadLocal taskContext =
--- End diff --

But then its already static code ? + Where else should I put it ? I 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubs

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17688175
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+  private static ThreadLocal taskContext =
--- End diff --

I would remove this and the below function. I think it's better it the 
`TaskContext` class is not itself aware of being stored as a thread local. We 
can keep all the logic related to thread locals in the static code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17688016
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
--- End diff --

I think it might be better to explicitly set this in the executor when we 
create the context rather than do it as side effect of construction. What do 
you think @rxin?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17678577
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+  private static ThreadLocal taskContext =
+new ThreadLocal();
+
+  public static TaskContext get() {
+return taskContext.get();
+  }
+
+  // List of callback functions to execute when the task completes.
+  private transient List onCompleteCallbacks =
+new ArrayList();
+
+  // Whether the corresponding task has been killed.
+  private volatile Boolean interrupted = false;
+
+  // Whether the task has completed.
+  private volatile Boolean completed = false;
+
+  /**
+   * Checks whether th

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17678541
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
--- End diff --

int, int, long, bool


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17678511
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+  private static ThreadLocal taskContext =
+new ThreadLocal();
+
+  public static TaskContext get() {
+return taskContext.get();
+  }
+
+  // List of callback functions to execute when the task completes.
+  private transient List onCompleteCallbacks =
+new ArrayList();
+
+  // Whether the corresponding task has been killed.
+  private volatile Boolean interrupted = false;
+
+  // Whether the task has completed.
+  private volatile Boolean completed = false;
+
+  /**
+   * Checks whether th

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17678497
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+  private static ThreadLocal taskContext =
+new ThreadLocal();
+
+  public static TaskContext get() {
+return taskContext.get();
+  }
+
+  // List of callback functions to execute when the task completes.
+  private transient List onCompleteCallbacks =
+new ArrayList();
+
+  // Whether the corresponding task has been killed.
+  private volatile Boolean interrupted = false;
+
+  // Whether the task has completed.
+  private volatile Boolean completed = false;
+
+  /**
+   * Checks whether th

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17678506
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+/**
+* :: DeveloperApi ::
+* Contextual information about a task which can be read or mutated during 
execution.
+*/
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+  private Integer stageId;
+  private Integer partitionId;
+  private Long attemptId;
+  private Boolean runningLocally;
+  private TaskMetrics taskMetrics;
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   * @param taskMetrics performance metrics of the task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId, 
Boolean runningLocally,
+ TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   * @param runningLocally whether the task is running locally in the 
driver JVM
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId,
+ Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+
+  /**
+   * :: DeveloperApi ::
+   * Contextual information about a task which can be read or mutated 
during execution.
+   *
+   * @param stageId stage id
+   * @param partitionId index of the partition
+   * @param attemptId the number of attempts to execute this task
+   */
+  @DeveloperApi
+  public TaskContext(Integer stageId, Integer partitionId, Long attemptId) 
{
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+  }
+
+  private static ThreadLocal taskContext =
+new ThreadLocal();
+
+  public static TaskContext get() {
+return taskContext.get();
+  }
+
+  // List of callback functions to execute when the task completes.
+  private transient List onCompleteCallbacks =
+new ArrayList();
+
+  // Whether the corresponding task has been killed.
+  private volatile Boolean interrupted = false;
+
+  // Whether the task has completed.
+  private volatile Boolean completed = false;
+
+  /**
+   * Checks whether th

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-55879258
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20464/consoleFull)
 for   PR 2425 at commit 
[`a7d5e23`](https://github.com/apache/spark/commit/a7d5e23330a159e91581a35d46e1846770eb421d).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-55875586
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20461/consoleFull)
 for   PR 2425 at commit 
[`edf945e`](https://github.com/apache/spark/commit/edf945e6765314f0b87092d460f71aa70feecdc5).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-55873286
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20464/consoleFull)
 for   PR 2425 at commit 
[`a7d5e23`](https://github.com/apache/spark/commit/a7d5e23330a159e91581a35d46e1846770eb421d).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-55869923
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20455/consoleFull)
 for   PR 2425 at commit 
[`f716fd1`](https://github.com/apache/spark/commit/f716fd1b0d84bcb08889b62af2d5f7a6d14b1cab).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-55868820
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20461/consoleFull)
 for   PR 2425 at commit 
[`edf945e`](https://github.com/apache/spark/commit/edf945e6765314f0b87092d460f71aa70feecdc5).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread ScrapCodes
Github user ScrapCodes commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-55867575
  
@rxin One side question, Java8ApiSuite(s) don't compile, looks like we have 
been overlooking them for a while. May be we could just remove them ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17653450
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+public Integer stageId;
+public Integer partitionId;
+public Long attemptId;
+public Boolean runningLocally;
+public TaskMetrics taskMetrics;
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId, Boolean runningLocally,
+   TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId,
+   Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+
+private static ThreadLocal taskContext =
--- End diff --

You mean text wrap ?, yes in one line they are 114 chars.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17651774
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+public Integer stageId;
+public Integer partitionId;
+public Long attemptId;
+public Boolean runningLocally;
+public TaskMetrics taskMetrics;
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId, Boolean runningLocally,
+   TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId,
+   Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+
+private static ThreadLocal taskContext =
+new ThreadLocal();
+
+public static TaskContext get() {
+return taskContext.get();
+}
+
+// List of callback functions to execute when the task completes.
+private transient List onCompleteCallbacks =
+new ArrayList();
+
+// Whether the corresponding task has been killed.
+private volatile Boolean interrupted = false;
+
+// Whether the task has completed.
+private volatile Boolean completed = false;
+
+/**
+ * Checks whether the task has completed.
+ */
+public Boolean isCompleted() {
+return completed;
+}
+
+/**
+ * Checks whether the task has been killed.
+ */
+public Boolean isInterrupted() {
+return interrupted;
+}
+
+
+/**
+ * Add a (Java friendly) listener to be executed on task completion.
+ * This will be called in all situation - success, failure, or 
cancellation.
+ *
+ * An example use is for HadoopRDD to register a callback to close the 
input stream.
+ */
+public TaskContext addTaskCompletionListener(TaskCompletionListener 
listener){
+onCompleteCallbacks.add(listener);
+return this;
+}
+
+
+/**
+ * Add a listener in the form of a Scala closure to be executed on 
task completion.
+ * This will be called in all situation - success, failure, or 
cancellation.
+ *
+ * An example use is for HadoopRDD to register a callback to close the 
input stream.
+ */
+public TaskContext addTaskCompletionListener(final 
Function1 f) {
+onCompleteCallbacks.add( new TaskCompletionListener

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17651763
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+public Integer stageId;
+public Integer partitionId;
+public Long attemptId;
+public Boolean runningLocally;
+public TaskMetrics taskMetrics;
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId, Boolean runningLocally,
+   TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId,
+   Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+
+private static ThreadLocal taskContext =
+new ThreadLocal();
+
+public static TaskContext get() {
+return taskContext.get();
+}
+
+// List of callback functions to execute when the task completes.
+private transient List onCompleteCallbacks =
+new ArrayList();
+
+// Whether the corresponding task has been killed.
+private volatile Boolean interrupted = false;
+
+// Whether the task has completed.
+private volatile Boolean completed = false;
+
+/**
+ * Checks whether the task has completed.
+ */
+public Boolean isCompleted() {
+return completed;
+}
+
+/**
+ * Checks whether the task has been killed.
+ */
+public Boolean isInterrupted() {
+return interrupted;
+}
+
+
+/**
+ * Add a (Java friendly) listener to be executed on task completion.
+ * This will be called in all situation - success, failure, or 
cancellation.
+ *
+ * An example use is for HadoopRDD to register a callback to close the 
input stream.
+ */
+public TaskContext addTaskCompletionListener(TaskCompletionListener 
listener){
--- End diff --

space before {


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17651760
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+public Integer stageId;
+public Integer partitionId;
+public Long attemptId;
+public Boolean runningLocally;
+public TaskMetrics taskMetrics;
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId, Boolean runningLocally,
+   TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId,
+   Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+
+private static ThreadLocal taskContext =
+new ThreadLocal();
+
+public static TaskContext get() {
+return taskContext.get();
+}
+
+// List of callback functions to execute when the task completes.
+private transient List onCompleteCallbacks =
+new ArrayList();
+
+// Whether the corresponding task has been killed.
+private volatile Boolean interrupted = false;
+
+// Whether the task has completed.
+private volatile Boolean completed = false;
+
+/**
+ * Checks whether the task has completed.
+ */
+public Boolean isCompleted() {
+return completed;
+}
+
+/**
+ * Checks whether the task has been killed.
+ */
+public Boolean isInterrupted() {
+return interrupted;
+}
+
--- End diff --

can u go through the file and use only one line to separate functions


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17651668
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+public Integer stageId;
--- End diff --

actually don't make them public - this breaks binary compatibility right 
now.

you should make them private, and create a public accessor stageId()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17651635
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+public Integer stageId;
--- End diff --

do we do 2 space or 4 space indent?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17651616
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+public Integer stageId;
+public Integer partitionId;
+public Long attemptId;
+public Boolean runningLocally;
+public TaskMetrics taskMetrics;
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId, Boolean runningLocally,
+   TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId,
+   Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+
+private static ThreadLocal taskContext =
+new ThreadLocal();
+
+public static TaskContext get() {
+return taskContext.get();
+}
+
+// List of callback functions to execute when the task completes.
+private transient List onCompleteCallbacks =
+new ArrayList();
+
+// Whether the corresponding task has been killed.
+private volatile Boolean interrupted = false;
+
+// Whether the task has completed.
+private volatile Boolean completed = false;
+
+/**
+ * Checks whether the task has completed.
+ */
+public Boolean isCompleted() {
+return completed;
+}
+
+/**
+ * Checks whether the task has been killed.
+ */
+public Boolean isInterrupted() {
+return interrupted;
+}
+
+
+/**
+ * Add a (Java friendly) listener to be executed on task completion.
+ * This will be called in all situation - success, failure, or 
cancellation.
+ *
+ * An example use is for HadoopRDD to register a callback to close the 
input stream.
+ */
+public TaskContext addTaskCompletionListener(TaskCompletionListener 
listener){
+onCompleteCallbacks.add(listener);
+return this;
+}
+
+
+/**
+ * Add a listener in the form of a Scala closure to be executed on 
task completion.
+ * This will be called in all situation - success, failure, or 
cancellation.
+ *
+ * An example use is for HadoopRDD to register a callback to close the 
input stream.
+ */
+public TaskContext addTaskCompletionListener(final 
Function1 f) {
+onCompleteCallbacks.add( new TaskCompletionListener

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2425#issuecomment-55863345
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20455/consoleFull)
 for   PR 2425 at commit 
[`f716fd1`](https://github.com/apache/spark/commit/f716fd1b0d84bcb08889b62af2d5f7a6d14b1cab).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17651575
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+public Integer stageId;
+public Integer partitionId;
+public Long attemptId;
+public Boolean runningLocally;
+public TaskMetrics taskMetrics;
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId, Boolean runningLocally,
+   TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId,
+   Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+
+private static ThreadLocal taskContext =
+new ThreadLocal();
+
+public static TaskContext get() {
+return taskContext.get();
+}
+
+// List of callback functions to execute when the task completes.
+private transient List onCompleteCallbacks =
+new ArrayList();
+
+// Whether the corresponding task has been killed.
+private volatile Boolean interrupted = false;
+
+// Whether the task has completed.
+private volatile Boolean completed = false;
+
+/**
+ * Checks whether the task has completed.
+ */
+public Boolean isCompleted() {
+return completed;
+}
+
+/**
+ * Checks whether the task has been killed.
+ */
+public Boolean isInterrupted() {
+return interrupted;
+}
+
+
+/**
+ * Add a (Java friendly) listener to be executed on task completion.
+ * This will be called in all situation - success, failure, or 
cancellation.
+ *
+ * An example use is for HadoopRDD to register a callback to close the 
input stream.
+ */
+public TaskContext addTaskCompletionListener(TaskCompletionListener 
listener){
+onCompleteCallbacks.add(listener);
+return this;
+}
+
+
+/**
+ * Add a listener in the form of a Scala closure to be executed on 
task completion.
+ * This will be called in all situation - success, failure, or 
cancellation.
+ *
+ * An example use is for HadoopRDD to register a callback to close the 
input stream.
+ */
+public TaskContext addTaskCompletionListener(final 
Function1 f) {
+onCompleteCallbacks.add( new TaskCompletionListener

[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17651524
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+public Integer stageId;
+public Integer partitionId;
+public Long attemptId;
+public Boolean runningLocally;
+public TaskMetrics taskMetrics;
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId, Boolean runningLocally,
+   TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId,
+   Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+
+private static ThreadLocal taskContext =
--- End diff --

u don't need to wrap this, do u?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2425#discussion_r17651529
  
--- Diff: core/src/main/java/org/apache/spark/TaskContext.java ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Function0;
+import scala.Function1;
+import scala.Unit;
+import scala.collection.JavaConversions;
+
+import org.apache.spark.annotation.DeveloperApi;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.util.TaskCompletionListener;
+import org.apache.spark.util.TaskCompletionListenerException;
+
+@DeveloperApi
+public class TaskContext implements Serializable {
+
+public Integer stageId;
+public Integer partitionId;
+public Long attemptId;
+public Boolean runningLocally;
+public TaskMetrics taskMetrics;
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId, Boolean runningLocally,
+   TaskMetrics taskMetrics) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = taskMetrics;
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId,
+   Boolean runningLocally) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = runningLocally;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
+public TaskContext(Integer stageId, Integer partitionId, Long 
attemptId) {
+this.attemptId = attemptId;
+this.partitionId = partitionId;
+this.runningLocally = false;
+this.stageId = stageId;
+this.taskMetrics = TaskMetrics.empty();
+taskContext.set(this);
+}
+
--- End diff --

extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...

2014-09-17 Thread ScrapCodes
GitHub user ScrapCodes opened a pull request:

https://github.com/apache/spark/pull/2425

[SPARK-3543] Write TaskContext in Java and expose it through a static 
accessor.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ScrapCodes/spark-1 SPARK-3543/withTaskContext

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2425.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2425


commit 333c7d644973c721f0b0509399579723d2a43446
Author: Prashant Sharma 
Date:   2014-09-17T07:40:47Z

Translated Task context from scala to java.

commit f716fd1b0d84bcb08889b62af2d5f7a6d14b1cab
Author: Prashant Sharma 
Date:   2014-09-17T08:15:39Z

introduced thread local for getting the task context.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org