[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r18121901 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private int stageId; + private int partitionId; + private long attemptId; + private boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + private static ThreadLocal taskContext = +new ThreadLocal(); + + /** + * :: Internal API :: + * This is spark internal API, not intended to be called from user programs. + */ + public static void setTaskContext(TaskContext tc) { +taskContext.set(tc); + } + + public static TaskContext get() { +return taskContext.get(); + } + + /** + * :: Internal API :: + */ + public static void remove() { +taskContext.remove(); + } + + // List of callback functions to execute when the task completes. + private transient List onCompleteCallbacks = +new ArrayLi
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2425 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r18121892 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private int stageId; + private int partitionId; + private long attemptId; + private boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, --- End diff -- fyi all these should be primitives rather than boxed Integers/Longs/Booleans. I'm going to fix it in a separate pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-57042070 Ok I'm merging this. I will push some minor cleanups after merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r18121869 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -45,7 +45,8 @@ import org.apache.spark.util.Utils private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable { final def run(attemptId: Long): T = { -context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false) +context = new TaskContext(stageId, partitionId, attemptId, false) --- End diff -- if the parameter is defined in java it cannot be named --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-56952085 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20856/consoleFull) for PR 2425 at commit [`8ae414c`](https://github.com/apache/spark/commit/8ae414c1ff2af5328cac7cc36b28e66f3aa6647d). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-56952094 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20856/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r18083933 --- Diff: core/src/test/scala/org/apache/spark/CacheManagerSuite.scala --- @@ -94,7 +94,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, runningLocally = true) + val context = new TaskContext(0, 0, 0, true) --- End diff -- These were breaking compilations.. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-56946632 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20856/consoleFull) for PR 2425 at commit [`8ae414c`](https://github.com/apache/spark/commit/8ae414c1ff2af5328cac7cc36b28e66f3aa6647d). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-56946056 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20855/consoleFull) for PR 2425 at commit [`5f8555b`](https://github.com/apache/spark/commit/5f8555b7742c3818fa38e57dea477cd2a9636711). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-56946058 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20855/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-56945767 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20855/consoleFull) for PR 2425 at commit [`5f8555b`](https://github.com/apache/spark/commit/5f8555b7742c3818fa38e57dea477cd2a9636711). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-56944743 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20854/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-56944740 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20854/consoleFull) for PR 2425 at commit [`155bc3b`](https://github.com/apache/spark/commit/155bc3b966b7b8960e09375275ef291d23feed53). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-5698 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20854/consoleFull) for PR 2425 at commit [`155bc3b`](https://github.com/apache/spark/commit/155bc3b966b7b8960e09375275ef291d23feed53). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-56776648 @ScrapCodes will you have time to address the feedback? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-56286065 @ScrapCodes made another pass with some comments. Overall this is looking good --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17822812 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -45,7 +45,8 @@ import org.apache.spark.util.Utils private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable { final def run(attemptId: Long): T = { -context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false) +context = new TaskContext(stageId, partitionId, attemptId, false) +TaskContext.setTaskContext(context) --- End diff -- Can we also clear this after the task is run? It might be good to expose a static `remove` method in the `TaskContext` class. We should also put a warning to users never to call that method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17822804 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private int stageId; + private int partitionId; + private long attemptId; + private boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + private static ThreadLocal taskContext = +new ThreadLocal(); + + public static void setTaskContext(TaskContext tc) { --- End diff -- Can you add a warning in the javadoc that explains this should never be called by user programs? It would be nice if this could be `private[spark]` but unfortunately we are in java land :P --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17822777 --- Diff: core/src/test/scala/org/apache/spark/CacheManagerSuite.scala --- @@ -94,7 +94,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, runningLocally = true) + val context = new TaskContext(0, 0, 0, true) --- End diff -- we should name it here too --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17822775 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -45,7 +45,8 @@ import org.apache.spark.util.Utils private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable { final def run(attemptId: Long): T = { -context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false) +context = new TaskContext(stageId, partitionId, attemptId, false) --- End diff -- any reason to drop the naming for `runningLocally`? the correct style is to name all optional paremeters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17822735 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private int stageId; + private int partitionId; + private long attemptId; + private boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + private static ThreadLocal taskContext = +new ThreadLocal(); + + public static void setTaskContext(TaskContext tc) { +taskContext.set(tc); + } + + public static TaskContext get() { +return taskContext.get(); + } + + // List of callback functions to execute when the task completes. + private transient List onCompleteCallbacks = +new ArrayList(); + + // Whether the corresponding task has been killed. + private volatile Boolean interrupted = false; + + // Whether the task has completed. + private volatile Boolean completed = false; + + /** + * Checks whethe
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17822738 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private int stageId; + private int partitionId; + private long attemptId; + private boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + private static ThreadLocal taskContext = +new ThreadLocal(); + + public static void setTaskContext(TaskContext tc) { +taskContext.set(tc); + } + + public static TaskContext get() { +return taskContext.get(); + } + + // List of callback functions to execute when the task completes. + private transient List onCompleteCallbacks = +new ArrayList(); + + // Whether the corresponding task has been killed. + private volatile Boolean interrupted = false; + + // Whether the task has completed. + private volatile Boolean completed = false; + + /** + * Checks whethe
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17822726 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private int stageId; + private int partitionId; + private long attemptId; + private boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + private static ThreadLocal taskContext = +new ThreadLocal(); + + public static void setTaskContext(TaskContext tc) { +taskContext.set(tc); + } + + public static TaskContext get() { +return taskContext.get(); + } + + // List of callback functions to execute when the task completes. + private transient List onCompleteCallbacks = +new ArrayList(); + + // Whether the corresponding task has been killed. + private volatile Boolean interrupted = false; + + // Whether the task has completed. + private volatile Boolean completed = false; + + /** + * Checks whethe
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17822725 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private int stageId; + private int partitionId; + private long attemptId; + private boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + private static ThreadLocal taskContext = +new ThreadLocal(); + + public static void setTaskContext(TaskContext tc) { +taskContext.set(tc); + } + + public static TaskContext get() { +return taskContext.get(); + } + + // List of callback functions to execute when the task completes. + private transient List onCompleteCallbacks = +new ArrayList(); + + // Whether the corresponding task has been killed. + private volatile Boolean interrupted = false; + + // Whether the task has completed. + private volatile Boolean completed = false; + + /** + * Checks whethe
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17822722 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private int stageId; + private int partitionId; + private long attemptId; + private boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); + } + + private static ThreadLocal taskContext = +new ThreadLocal(); + + public static void setTaskContext(TaskContext tc) { +taskContext.set(tc); + } + + public static TaskContext get() { +return taskContext.get(); + } + + // List of callback functions to execute when the task completes. + private transient List onCompleteCallbacks = +new ArrayList(); + + // Whether the corresponding task has been killed. + private volatile Boolean interrupted = false; + + // Whether the task has completed. + private volatile Boolean completed = false; + + /** + * Checks whethe
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-56024795 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20536/consoleFull) for PR 2425 at commit [`ee8bd00`](https://github.com/apache/spark/commit/ee8bd009fe8065c4ea018c4ff5baa32378606243). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-56018945 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20536/consoleFull) for PR 2425 at commit [`ee8bd00`](https://github.com/apache/spark/commit/ee8bd009fe8065c4ea018c4ff5baa32378606243). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17713666 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + private static ThreadLocal taskContext = --- End diff -- yes, get must be static. he's only thinking we should move the set to somewhere else --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: rev
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17713674 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + private static ThreadLocal taskContext = --- End diff -- assuming i'm reading his mind correctly :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For addit
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17713594 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + private static ThreadLocal taskContext = --- End diff -- But then how would someone access it ? I hope he wants to retain atleast the get method inside TaskContext as static. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17713298 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + private static ThreadLocal taskContext = +new ThreadLocal(); + + public static TaskContext get() { +return taskContext.get(); + } + + // List of callback functions to execute when the task completes. + private transient List onCompleteCallbacks = +new ArrayList(); + + // Whether the corresponding task has been killed. + private volatile Boolean interrupted = false; + + // Whether the task has completed. + private volatile Boolean completed = false; + + /** + * Checks whether th
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17713233 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + private static ThreadLocal taskContext = +new ThreadLocal(); + + public static TaskContext get() { +return taskContext.get(); + } + + // List of callback functions to execute when the task completes. + private transient List onCompleteCallbacks = +new ArrayList(); + + // Whether the corresponding task has been killed. + private volatile Boolean interrupted = false; + + // Whether the task has completed. + private volatile Boolean completed = false; + + /** + * Checks whet
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17713126 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + private static ThreadLocal taskContext = --- End diff -- alright --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: revie
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17712911 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + private static ThreadLocal taskContext = --- End diff -- I think Patrick meant putting it in executor or in Task's run method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17712697 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + private static ThreadLocal taskContext = --- End diff -- But then its already static code ? + Where else should I put it ? I --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubs
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17688175 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + private static ThreadLocal taskContext = --- End diff -- I would remove this and the below function. I think it's better it the `TaskContext` class is not itself aware of being stored as a thread local. We can keep all the logic related to thread locals in the static code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17688016 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); --- End diff -- I think it might be better to explicitly set this in the executor when we create the context rather than do it as side effect of construction. What do you think @rxin? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17678577 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + private static ThreadLocal taskContext = +new ThreadLocal(); + + public static TaskContext get() { +return taskContext.get(); + } + + // List of callback functions to execute when the task completes. + private transient List onCompleteCallbacks = +new ArrayList(); + + // Whether the corresponding task has been killed. + private volatile Boolean interrupted = false; + + // Whether the task has completed. + private volatile Boolean completed = false; + + /** + * Checks whether th
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17678541 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; --- End diff -- int, int, long, bool --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17678511 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + private static ThreadLocal taskContext = +new ThreadLocal(); + + public static TaskContext get() { +return taskContext.get(); + } + + // List of callback functions to execute when the task completes. + private transient List onCompleteCallbacks = +new ArrayList(); + + // Whether the corresponding task has been killed. + private volatile Boolean interrupted = false; + + // Whether the task has completed. + private volatile Boolean completed = false; + + /** + * Checks whether th
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17678497 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + private static ThreadLocal taskContext = +new ThreadLocal(); + + public static TaskContext get() { +return taskContext.get(); + } + + // List of callback functions to execute when the task completes. + private transient List onCompleteCallbacks = +new ArrayList(); + + // Whether the corresponding task has been killed. + private volatile Boolean interrupted = false; + + // Whether the task has completed. + private volatile Boolean completed = false; + + /** + * Checks whether th
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17678506 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +/** +* :: DeveloperApi :: +* Contextual information about a task which can be read or mutated during execution. +*/ +@DeveloperApi +public class TaskContext implements Serializable { + + private Integer stageId; + private Integer partitionId; + private Long attemptId; + private Boolean runningLocally; + private TaskMetrics taskMetrics; + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + * @param taskMetrics performance metrics of the task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + * @param runningLocally whether the task is running locally in the driver JVM + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + + /** + * :: DeveloperApi :: + * Contextual information about a task which can be read or mutated during execution. + * + * @param stageId stage id + * @param partitionId index of the partition + * @param attemptId the number of attempts to execute this task + */ + @DeveloperApi + public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); + } + + private static ThreadLocal taskContext = +new ThreadLocal(); + + public static TaskContext get() { +return taskContext.get(); + } + + // List of callback functions to execute when the task completes. + private transient List onCompleteCallbacks = +new ArrayList(); + + // Whether the corresponding task has been killed. + private volatile Boolean interrupted = false; + + // Whether the task has completed. + private volatile Boolean completed = false; + + /** + * Checks whether th
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-55879258 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20464/consoleFull) for PR 2425 at commit [`a7d5e23`](https://github.com/apache/spark/commit/a7d5e23330a159e91581a35d46e1846770eb421d). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-55875586 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20461/consoleFull) for PR 2425 at commit [`edf945e`](https://github.com/apache/spark/commit/edf945e6765314f0b87092d460f71aa70feecdc5). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-55873286 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20464/consoleFull) for PR 2425 at commit [`a7d5e23`](https://github.com/apache/spark/commit/a7d5e23330a159e91581a35d46e1846770eb421d). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-55869923 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20455/consoleFull) for PR 2425 at commit [`f716fd1`](https://github.com/apache/spark/commit/f716fd1b0d84bcb08889b62af2d5f7a6d14b1cab). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-55868820 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20461/consoleFull) for PR 2425 at commit [`edf945e`](https://github.com/apache/spark/commit/edf945e6765314f0b87092d460f71aa70feecdc5). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user ScrapCodes commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-55867575 @rxin One side question, Java8ApiSuite(s) don't compile, looks like we have been overlooking them for a while. May be we could just remove them ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17653450 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +@DeveloperApi +public class TaskContext implements Serializable { + +public Integer stageId; +public Integer partitionId; +public Long attemptId; +public Boolean runningLocally; +public TaskMetrics taskMetrics; + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + + +private static ThreadLocal taskContext = --- End diff -- You mean text wrap ?, yes in one line they are 114 chars. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17651774 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +@DeveloperApi +public class TaskContext implements Serializable { + +public Integer stageId; +public Integer partitionId; +public Long attemptId; +public Boolean runningLocally; +public TaskMetrics taskMetrics; + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + + +private static ThreadLocal taskContext = +new ThreadLocal(); + +public static TaskContext get() { +return taskContext.get(); +} + +// List of callback functions to execute when the task completes. +private transient List onCompleteCallbacks = +new ArrayList(); + +// Whether the corresponding task has been killed. +private volatile Boolean interrupted = false; + +// Whether the task has completed. +private volatile Boolean completed = false; + +/** + * Checks whether the task has completed. + */ +public Boolean isCompleted() { +return completed; +} + +/** + * Checks whether the task has been killed. + */ +public Boolean isInterrupted() { +return interrupted; +} + + +/** + * Add a (Java friendly) listener to be executed on task completion. + * This will be called in all situation - success, failure, or cancellation. + * + * An example use is for HadoopRDD to register a callback to close the input stream. + */ +public TaskContext addTaskCompletionListener(TaskCompletionListener listener){ +onCompleteCallbacks.add(listener); +return this; +} + + +/** + * Add a listener in the form of a Scala closure to be executed on task completion. + * This will be called in all situation - success, failure, or cancellation. + * + * An example use is for HadoopRDD to register a callback to close the input stream. + */ +public TaskContext addTaskCompletionListener(final Function1 f) { +onCompleteCallbacks.add( new TaskCompletionListener
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17651763 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +@DeveloperApi +public class TaskContext implements Serializable { + +public Integer stageId; +public Integer partitionId; +public Long attemptId; +public Boolean runningLocally; +public TaskMetrics taskMetrics; + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + + +private static ThreadLocal taskContext = +new ThreadLocal(); + +public static TaskContext get() { +return taskContext.get(); +} + +// List of callback functions to execute when the task completes. +private transient List onCompleteCallbacks = +new ArrayList(); + +// Whether the corresponding task has been killed. +private volatile Boolean interrupted = false; + +// Whether the task has completed. +private volatile Boolean completed = false; + +/** + * Checks whether the task has completed. + */ +public Boolean isCompleted() { +return completed; +} + +/** + * Checks whether the task has been killed. + */ +public Boolean isInterrupted() { +return interrupted; +} + + +/** + * Add a (Java friendly) listener to be executed on task completion. + * This will be called in all situation - success, failure, or cancellation. + * + * An example use is for HadoopRDD to register a callback to close the input stream. + */ +public TaskContext addTaskCompletionListener(TaskCompletionListener listener){ --- End diff -- space before { --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17651760 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +@DeveloperApi +public class TaskContext implements Serializable { + +public Integer stageId; +public Integer partitionId; +public Long attemptId; +public Boolean runningLocally; +public TaskMetrics taskMetrics; + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + + +private static ThreadLocal taskContext = +new ThreadLocal(); + +public static TaskContext get() { +return taskContext.get(); +} + +// List of callback functions to execute when the task completes. +private transient List onCompleteCallbacks = +new ArrayList(); + +// Whether the corresponding task has been killed. +private volatile Boolean interrupted = false; + +// Whether the task has completed. +private volatile Boolean completed = false; + +/** + * Checks whether the task has completed. + */ +public Boolean isCompleted() { +return completed; +} + +/** + * Checks whether the task has been killed. + */ +public Boolean isInterrupted() { +return interrupted; +} + --- End diff -- can u go through the file and use only one line to separate functions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17651668 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +@DeveloperApi +public class TaskContext implements Serializable { + +public Integer stageId; --- End diff -- actually don't make them public - this breaks binary compatibility right now. you should make them private, and create a public accessor stageId() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17651635 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +@DeveloperApi +public class TaskContext implements Serializable { + +public Integer stageId; --- End diff -- do we do 2 space or 4 space indent? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17651616 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +@DeveloperApi +public class TaskContext implements Serializable { + +public Integer stageId; +public Integer partitionId; +public Long attemptId; +public Boolean runningLocally; +public TaskMetrics taskMetrics; + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + + +private static ThreadLocal taskContext = +new ThreadLocal(); + +public static TaskContext get() { +return taskContext.get(); +} + +// List of callback functions to execute when the task completes. +private transient List onCompleteCallbacks = +new ArrayList(); + +// Whether the corresponding task has been killed. +private volatile Boolean interrupted = false; + +// Whether the task has completed. +private volatile Boolean completed = false; + +/** + * Checks whether the task has completed. + */ +public Boolean isCompleted() { +return completed; +} + +/** + * Checks whether the task has been killed. + */ +public Boolean isInterrupted() { +return interrupted; +} + + +/** + * Add a (Java friendly) listener to be executed on task completion. + * This will be called in all situation - success, failure, or cancellation. + * + * An example use is for HadoopRDD to register a callback to close the input stream. + */ +public TaskContext addTaskCompletionListener(TaskCompletionListener listener){ +onCompleteCallbacks.add(listener); +return this; +} + + +/** + * Add a listener in the form of a Scala closure to be executed on task completion. + * This will be called in all situation - success, failure, or cancellation. + * + * An example use is for HadoopRDD to register a callback to close the input stream. + */ +public TaskContext addTaskCompletionListener(final Function1 f) { +onCompleteCallbacks.add( new TaskCompletionListener
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2425#issuecomment-55863345 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20455/consoleFull) for PR 2425 at commit [`f716fd1`](https://github.com/apache/spark/commit/f716fd1b0d84bcb08889b62af2d5f7a6d14b1cab). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17651575 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +@DeveloperApi +public class TaskContext implements Serializable { + +public Integer stageId; +public Integer partitionId; +public Long attemptId; +public Boolean runningLocally; +public TaskMetrics taskMetrics; + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + + +private static ThreadLocal taskContext = +new ThreadLocal(); + +public static TaskContext get() { +return taskContext.get(); +} + +// List of callback functions to execute when the task completes. +private transient List onCompleteCallbacks = +new ArrayList(); + +// Whether the corresponding task has been killed. +private volatile Boolean interrupted = false; + +// Whether the task has completed. +private volatile Boolean completed = false; + +/** + * Checks whether the task has completed. + */ +public Boolean isCompleted() { +return completed; +} + +/** + * Checks whether the task has been killed. + */ +public Boolean isInterrupted() { +return interrupted; +} + + +/** + * Add a (Java friendly) listener to be executed on task completion. + * This will be called in all situation - success, failure, or cancellation. + * + * An example use is for HadoopRDD to register a callback to close the input stream. + */ +public TaskContext addTaskCompletionListener(TaskCompletionListener listener){ +onCompleteCallbacks.add(listener); +return this; +} + + +/** + * Add a listener in the form of a Scala closure to be executed on task completion. + * This will be called in all situation - success, failure, or cancellation. + * + * An example use is for HadoopRDD to register a callback to close the input stream. + */ +public TaskContext addTaskCompletionListener(final Function1 f) { +onCompleteCallbacks.add( new TaskCompletionListener
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17651524 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +@DeveloperApi +public class TaskContext implements Serializable { + +public Integer stageId; +public Integer partitionId; +public Long attemptId; +public Boolean runningLocally; +public TaskMetrics taskMetrics; + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + + +private static ThreadLocal taskContext = --- End diff -- u don't need to wrap this, do u? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2425#discussion_r17651529 --- Diff: core/src/main/java/org/apache/spark/TaskContext.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Function0; +import scala.Function1; +import scala.Unit; +import scala.collection.JavaConversions; + +import org.apache.spark.annotation.DeveloperApi; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.util.TaskCompletionListener; +import org.apache.spark.util.TaskCompletionListenerException; + +@DeveloperApi +public class TaskContext implements Serializable { + +public Integer stageId; +public Integer partitionId; +public Long attemptId; +public Boolean runningLocally; +public TaskMetrics taskMetrics; + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, Boolean runningLocally, + TaskMetrics taskMetrics) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = taskMetrics; +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId, + Boolean runningLocally) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = runningLocally; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + +public TaskContext(Integer stageId, Integer partitionId, Long attemptId) { +this.attemptId = attemptId; +this.partitionId = partitionId; +this.runningLocally = false; +this.stageId = stageId; +this.taskMetrics = TaskMetrics.empty(); +taskContext.set(this); +} + --- End diff -- extra line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3543] Write TaskContext in Java and exp...
GitHub user ScrapCodes opened a pull request: https://github.com/apache/spark/pull/2425 [SPARK-3543] Write TaskContext in Java and expose it through a static accessor. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ScrapCodes/spark-1 SPARK-3543/withTaskContext Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2425.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2425 commit 333c7d644973c721f0b0509399579723d2a43446 Author: Prashant Sharma Date: 2014-09-17T07:40:47Z Translated Task context from scala to java. commit f716fd1b0d84bcb08889b62af2d5f7a6d14b1cab Author: Prashant Sharma Date: 2014-09-17T08:15:39Z introduced thread local for getting the task context. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org