[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/1297#discussion_r17790219 --- Diff: core/src/main/scala/org/apache/spark/rdd/IndexedRDDLike.scala --- @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.immutable.LongMap +import scala.language.higherKinds +import scala.reflect.ClassTag + +import org.apache.spark._ +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StorageLevel + +import IndexedRDD.Id + +/** + * Contains members that are shared among all variants of IndexedRDD (e.g., IndexedRDD, + * VertexRDD). + * + * @tparam V the type of the values stored in the IndexedRDD + * @tparam P the type of the partitions making up the IndexedRDD + * @tparam Self the type of the implementing container. This allows transformation methods on any + * implementing container to yield a result of the same type. + */ +private[spark] trait IndexedRDDLike[ +@specialized(Long, Int, Double) V, +P[X] <: IndexedRDDPartitionLike[X, P], +Self[X] <: IndexedRDDLike[X, P, Self]] + extends RDD[(Id, V)] { + + /** A generator for ClassTags of the value type V. */ + protected implicit def vTag: ClassTag[V] + + /** A generator for ClassTags of the partition type P. */ + protected implicit def pTag[V2]: ClassTag[P[V2]] + + /** Accessor for the IndexedRDD variant that is mixing in this trait. */ + protected def self: Self[V] + + /** The underlying representation of the IndexedRDD as an RDD of partitions. */ + def partitionsRDD: RDD[P[V]] + require(partitionsRDD.partitioner.isDefined) + + def withPartitionsRDD[V2: ClassTag](partitionsRDD: RDD[P[V2]]): Self[V2] + + override val partitioner = partitionsRDD.partitioner + + override protected def getPartitions: Array[Partition] = partitionsRDD.partitions + + override protected def getPreferredLocations(s: Partition): Seq[String] = +partitionsRDD.preferredLocations(s) + + override def persist(newLevel: StorageLevel): this.type = { +partitionsRDD.persist(newLevel) +this + } + + override def unpersist(blocking: Boolean = true): this.type = { +partitionsRDD.unpersist(blocking) +this + } + + override def count(): Long = { +partitionsRDD.map(_.size).reduce(_ + _) + } + + /** Provides the `RDD[(Id, V)]` equivalent output. */ + override def compute(part: Partition, context: TaskContext): Iterator[(Id, V)] = { +firstParent[P[V]].iterator(part, context).next.iterator + } + + /** Gets the value corresponding to the specified key, if any. */ + def get(k: Id): Option[V] = multiget(Array(k)).get(k) + + /** Gets the values corresponding to the specified keys, if any. */ + def multiget(ks: Array[Id]): Map[Id, V] = { +val ksByPartition = ks.groupBy(k => self.partitioner.get.getPartition(k)) +val partitions = ksByPartition.keys.toSeq +def unionMaps(maps: TraversableOnce[LongMap[V]]): LongMap[V] = { + maps.foldLeft(LongMap.empty[V]) { +(accum, map) => accum.unionWith(map, (id, a, b) => a) + } +} +// TODO: avoid sending all keys to all partitions by creating and zipping an RDD of keys --- End diff -- would this be another use of the `bulkMultiget` I suggested in jira? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/1297#discussion_r17791303 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ImmutableLongOpenHashSet.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import scala.reflect._ +import com.google.common.hash.Hashing + +/** + * A fast, immutable hash set optimized for insertions and lookups (but not deletions) of `Long` + * elements. Because it exposes the position of a key in the underlying array, this is useful as a + * building block for higher level data structures such as a hash map (for example, + * IndexedRDDPartition). + * + * It uses quadratic probing with a power-of-2 hash table size, which is guaranteed to explore all + * spaces for each key (see http://en.wikipedia.org/wiki/Quadratic_probing). + */ +private[spark] class ImmutableLongOpenHashSet( +/** Underlying array of elements used as a hash table. */ +val data: ImmutableVector[Long], +/** Whether or not there is an element at the corresponding position in `data`. */ +val bitset: ImmutableBitSet, +/** + * Position of a focused element. This is useful when returning a modified set along with a + * pointer to the location of modification. + */ +val focus: Int, +/** Load threshold at which to grow the underlying vectors. */ +loadFactor: Double + ) extends Serializable { + + require(loadFactor < 1.0, "Load factor must be less than 1.0") + require(loadFactor > 0.0, "Load factor must be greater than 0.0") + require(capacity == nextPowerOf2(capacity), "data capacity must be a power of 2") + + import OpenHashSet.{INVALID_POS, NONEXISTENCE_MASK, POSITION_MASK, Hasher, LongHasher} + + private val hasher: Hasher[Long] = new LongHasher + + private def mask = capacity - 1 + private def growThreshold = (loadFactor * capacity).toInt + + def withFocus(focus: Int): ImmutableLongOpenHashSet = +new ImmutableLongOpenHashSet(data, bitset, focus, loadFactor) + + /** The number of elements in the set. */ + def size: Int = bitset.cardinality + + /** The capacity of the set (i.e. size of the underlying vector). */ + def capacity: Int = data.size + + /** Return true if this set contains the specified element. */ + def contains(k: Long): Boolean = getPos(k) != INVALID_POS + + /** + * Nondestructively add an element to the set, returning a new set. If the set is over capacity + * after the insertion, grows the set and rehashes all elements. + */ + def add(k: Long): ImmutableLongOpenHashSet = { +addWithoutResize(k).rehashIfNeeded(ImmutableLongOpenHashSet.grow, ImmutableLongOpenHashSet.move) + } + + /** + * Add an element to the set. This one differs from add in that it doesn't trigger rehashing. + * The caller is responsible for calling rehashIfNeeded. + * + * Use (retval.focus & POSITION_MASK) to get the actual position, and + * (retval.focus & NONEXISTENCE_MASK) == 0 for prior existence. + */ + def addWithoutResize(k: Long): ImmutableLongOpenHashSet = { +var pos = hashcode(hasher.hash(k)) & mask +var i = 1 +var result: ImmutableLongOpenHashSet = null +while (result == null) { + if (!bitset.get(pos)) { +// This is a new key. +result = new ImmutableLongOpenHashSet( + data.updated(pos, k), bitset.set(pos), pos | NONEXISTENCE_MASK, loadFactor) + } else if (data(pos) == k) { +// Found an existing key. +result = this.withFocus(pos) + } else { +val delta = i +pos = (pos + delta) & mask +i += 1 + } +} +result + } + + /** +
[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/1297#issuecomment-56199798 This looks great! my comments are minor. I know its early to be discussing example docs, but I just wanted to mention that I can see caching being an area of confusion. Eg., you wouldn't want to serialize & cache each update to an indexedRDD, as each cache would make a full copy and not get the benefits of the ImmutableVectors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16650: [SPARK-16554][CORE] Automatically Kill Executors and Nod...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16650 merged to master, thanks @jsoltren --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16862: [SPARK-19520][streaming] Do not encrypt data writ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16862#discussion_r100414780 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -758,9 +761,33 @@ private[spark] class BlockManager( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, - tellMaster: Boolean = true): Boolean = { + tellMaster: Boolean = true, + encrypt: Boolean = false): Boolean = { --- End diff -- I think its worth documenting this param. At first I was going to suggest that it should be called `allowEncryption` like the other one, but I realize its more complicated than that. Maybe something like If true, the given bytes should be encrypted before they are stored. Note that in most cases, the given bytes will *already* be encrypted if encryption is on. An important exception to this is with the streaming WAL. Since the WAL does not support encryption, those bytes are generated un-encrypted. But we still encrypt those bytes before storing in the block manager. Maybe too wordy but I think its worth documenting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16876: [SPARK-19537] Move pendingPartitions to ShuffleMa...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16876#discussion_r100468844 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1038,9 +1038,8 @@ class DAGScheduler( } if (tasks.size > 0) { - logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") - stage.pendingPartitions ++= tasks.map(_.partitionId) - logDebug("New pending partitions: " + stage.pendingPartitions) + logInfo(s"Submitting ${tasks.size} missing tasks (for partitions " + +s"${tasks.map(_.partitionId)}) from $stage (${stage.rdd})") --- End diff -- I think including all the partitions could make this log line a little too long for an info message (what if there are 1k or 10k tasks). I understand not wanting to stick it in a debug message (you often dont' realize you want debug logging on until *after* something has failed), but I think this will be too much for regular runs. Would it still be useful if you truncate to the first 10 tasks? `tasks.take(10).map(...)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16813: [SPARK-19466][CORE][SCHEDULER] Improve Fair Scheduler Lo...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16813 failure is probably unrelated Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16813: [SPARK-19466][CORE][SCHEDULER] Improve Fair Scheduler Lo...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16813 @shaneknapp the build failed to trigger for me again, I did it manually via spark-prs.appspot.com --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.
Github user squito commented on the issue: https://github.com/apache/spark/pull/16876 lgtm when tests pass --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16901#discussion_r101059074 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,63 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { --- End diff -- can you rename to "After a fetch failure, success ..." really minor, but I had to read this twice --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16901#discussion_r101061681 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,63 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA)) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB)) + +submit(rddC, Array(0, 1)) +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + +// Complete both tasks in rddA. +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2 + +// Fetch failed on hostA for task(partitionId=0) and task(partitionId=1) is still running. +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, +"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), + result = null)) + +// Both original tasks in rddA should be marked as failed, because they ran on the +// failed hostA, so both should be resubmitted. Complete them successfully. +scheduler.resubmitFailedStages() +assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1 + && taskSets(2).tasks.size === 2) +complete(taskSets(2), Seq( + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostB", 2 + +// Both tasks in rddB should be resubmitted, because none of them has succeeded. +// Complete the task(partitionId=0) successfully. Task(partitionId=1) is still running. +assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 && + taskSets(3).tasks.size === 2) +runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) + +// Complete the task(partition=1) which is from the old attempt(stageId=1, stageAttempt=0) +// successfully. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) + +// Thanks to the success from old attempt of stage(stageId=1, stageAttempt=0), there's no +// pending partitions for stage(stageId=1) now, thus downstream stage should be submitted, +// though there's still a running task(stageId=1, stageAttempt=1, partitionId=1) +// in the active stage attempt. +assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]]) +complete(taskSets(4), Seq( --- End diff -- I was going to suggest adding a check here to make sure that all prior tasksetmanagers are marked as zombies. But (a) you can't check that, since the dagscheduler doesn't have a handle on the tasksetmanagers, and (b) more importantly, the prior TSMs actually are *not* marked as zombies. So they may continue to submit tasks even though they're not necessary. I will file a separate bug for that -- its a performance issue, not a correctness issue, so not critical. But this isn't the same as the old "kill running tasks when marking a tsm as a zombie" -- in this case, the issue is that the tsm may continue to launch *new* tasks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16901#discussion_r101064021 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,63 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA)) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB)) + +submit(rddC, Array(0, 1)) +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + +// Complete both tasks in rddA. +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2 + +// Fetch failed on hostA for task(partitionId=0) and task(partitionId=1) is still running. +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, +"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), + result = null)) + +// Both original tasks in rddA should be marked as failed, because they ran on the +// failed hostA, so both should be resubmitted. Complete them successfully. +scheduler.resubmitFailedStages() +assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1 + && taskSets(2).tasks.size === 2) +complete(taskSets(2), Seq( + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostB", 2 + +// Both tasks in rddB should be resubmitted, because none of them has succeeded. +// Complete the task(partitionId=0) successfully. Task(partitionId=1) is still running. +assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 && + taskSets(3).tasks.size === 2) +runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) + +// Complete the task(partition=1) which is from the old attempt(stageId=1, stageAttempt=0) +// successfully. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) + +// Thanks to the success from old attempt of stage(stageId=1, stageAttempt=0), there's no +// pending partitions for stage(stageId=1) now, thus downstream stage should be submitted, +// though there's still a running task(stageId=1, stageAttempt=1, partitionId=1) +// in the active stage attempt. +assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]]) +complete(taskSets(4), Seq( --- End diff -- https://issues.apache.org/jira/browse/SPARK-19596 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16620#discussion_r101070245 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1181,15 +1181,31 @@ class DAGScheduler( case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] -shuffleStage.pendingPartitions -= task.partitionId updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) +if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) { + // This task was for the currently running attempt of the stage. Since the task + // completed successfully from the perspective of the TaskSetManager, mark it as + // no longer pending (the TaskSetManager may consider the task complete even + // when the output needs to be ignored because the task's epoch is too small below). + shuffleStage.pendingPartitions -= task.partitionId --- End diff -- I think its worth also explaining how this inconsistency between pendingPartitions and outputLocations gets resolved. IIUC, its that when the pendingPartitions is empty, the scheduler will check outputLocations, realize something is missing, and resubmit this stage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16930: [SPARK-19597][CORE] test case for task deserializ...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/16930 [SPARK-19597][CORE] test case for task deserialization errors Adds a task case that ensures that Executors gracefully handle a task that fails to deserialize, by sending back a reasonable failure message. This does not change any behavior (the prior behavior was already correct), it just adds a test case to prevent regression. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark executor_task_deserialization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16930.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16930 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16639: [SPARK-19276][CORE] Fetch Failure handling robust...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16639#discussion_r101090765 --- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala --- @@ -133,6 +123,153 @@ class ExecutorSuite extends SparkFunSuite { } } } + + test("SPARK-19276: Handle Fetch Failed for all intervening user code") { +val conf = new SparkConf().setMaster("local").setAppName("executor suite test") +sc = new SparkContext(conf) + +val serializer = SparkEnv.get.closureSerializer.newInstance() +val resultFunc = (context: TaskContext, itr: Iterator[Int]) => itr.size +val inputRDD = new FakeShuffleRDD(sc) +val secondRDD = new FetchFailureHidingRDD(sc, inputRDD) +val taskBinary = sc.broadcast(serializer.serialize((secondRDD, resultFunc)).array()) +val serializedTaskMetrics = serializer.serialize(TaskMetrics.registered).array() +val task = new ResultTask( + stageId = 1, + stageAttemptId = 0, + taskBinary = taskBinary, + partition = secondRDD.partitions(0), + locs = Seq(), + outputId = 0, + localProperties = new Properties(), + serializedTaskMetrics = serializedTaskMetrics +) + +val serTask = serializer.serialize(task) +val taskDescription = fakeTaskDescription(serTask) + + +val failReason = runTaskAndGetFailReason(taskDescription) +assert(failReason.isInstanceOf[FetchFailed]) + } + + test("Gracefully handle error in task deserialization") { --- End diff -- @mridulm pointed out this bug in an earlier version of this pr, so I fixed the bug and added a test case. But in any case, I've separated this out into https://github.com/apache/spark/pull/16930 / https://issues.apache.org/jira/browse/SPARK-19597 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16639: [SPARK-19276][CORE] Fetch Failure handling robust...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16639#discussion_r101091688 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -400,8 +410,16 @@ private[spark] class Executor( execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { -case ffe: FetchFailedException => - val reason = ffe.toTaskFailedReason +case t: Throwable if hasFetchFailure => + val reason = task.context.fetchFailed.get.toTaskFailedReason + if (!t.isInstanceOf[FetchFailedException]) { +// there was a fetch failure in the task, but some user code wrapped that exception +// and threw something else. Regardless, we treat it as a fetch failure. +logWarning(s"TID ${taskId} encountered a ${classOf[FetchFailedException]} and " + + s"failed, but did not directly throw the ${classOf[FetchFailedException]}. " + + s"Spark is still handling the fetch failure, but these exceptions should not be " + + s"intercepted by user code.") --- End diff -- @mridulm @kayousterhout how is this msg? open to other suggestions. I'm not sure exactly what to recommend to the user instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16930: [SPARK-19597][CORE] test case for task deserialization e...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16930 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16639: [SPARK-19276][CORE] Fetch Failure handling robust to use...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16639 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16978: [SPARK-19652][UI] Do auth checks for REST API access.
Github user squito commented on the issue: https://github.com/apache/spark/pull/16978 code changes look fine, but is it possible to add a test case to HistoryServerSuite? I worry nobody is going to think about security / know how to test it so it may easily get broken. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user squito commented on the issue: https://github.com/apache/spark/pull/16901 sorry responding late to this, but your analysis sounds fine --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16892: [SPARK-19560] Improve DAGScheduler tests.
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16892#discussion_r102281985 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -1569,24 +1569,44 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } - test("run trivial shuffle with out-of-band failure and retry") { + /** + * In this test, we run a map stage where one of the executors fails but we still receive a + * "zombie" complete message from a task that ran on that executor. We want to make sure the + * stage is resubmitted so that the task that ran on the failed executor is re-executed, and + * that the stage is only marked as finished once that task completes. + */ + test("run trivial shuffle with out-of-band executor failure and retry") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) submit(reduceRdd, Array(0)) -// blockManagerMaster.removeExecutor("exec-hostA") -// pretend we were told hostA went away +// Tell the DAGScheduler that hostA was lost. runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) -// DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks -// rather than marking it is as failed and waiting. complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1 + +// At this point, no more tasks are running for the stage (and the TaskSetManager considers the +// stage complete), but the tasks that ran on HostA need to be re-run, so the DAGScheduler +// should re-submit the stage. +assert(taskSets.size === 2) + +// Make sure that the stage that was re-submitted was the ShuffleMapStage (not the reduce +// stage, which shouldn't be run until all of the tasks in the ShuffleMapStage complete on +// alive executors). +assert(taskSets(1).tasks(0).isInstanceOf[ShuffleMapTask]) --- End diff -- do you think its worth adding ```scala assert(taskSets(1).tasks.size === 1) ``` here, to make sure that only the one task is resubmitted, not both? If it weren't true, the test would fail later on anyway, but it might be helpful to get a more meaningful earlier error msg. Not necessary, up to you on whether its worth adding. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16892: [SPARK-19560] Improve DAGScheduler tests.
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16892#discussion_r102282650 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2031,6 +2051,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou * In this test, we run a map stage where one of the executors fails but we still receive a * "zombie" complete message from that executor. We want to make sure the stage is not reported * as done until all tasks have completed. + * + * Most of the functionality in this test is tested in "run trivial shuffle with out-of-band + * executor failure and retry". However, that test uses ShuffleMapStages that are followed by + * a ResultStage, whereas in this test, the ShuffleMapStage is tested in isolation, without a + * ResultStage after it. --- End diff -- I should have looked closer at this test earlier ... I was hoping this was testing a *multi*-stage mapjob. That would really be a better test. ideally we'd even have three stages, with a failure happening in the second stage, and the last stage. In any case, your changes still look good, no need to have to do those other things now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16989 Hi @jinxing64 I posted a comment on jira about the design -- I think this is a big enough change that its worth discussing the design first. Its fine to keep working on the code as a demonstration if you want, but for now I'd ask that you label this a work-in-progress "[WIP]". (I personally have only briefly glanced at the code and am unlikely to look more closely till we sort out the design issues.) fwiw I think this is will be a great feature, we just need to be thoughtful about it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16989 Jenkins, add to whitelist --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r102293021 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -911,14 +916,14 @@ private[spark] class TaskSetManager( logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { val time = clock.getTimeMillis() - val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray - Arrays.sort(durations) + val durations = successfulTasksSet.toArray.map(taskInfos(_).duration) val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1)) val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation) // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. logDebug("Task length threshold for speculation: " + threshold) - for ((tid, info) <- taskInfos) { + for (tid <- runningTasksSet) { +val info = taskInfos(tid) --- End diff -- oh, good find, this alone looks like a worthwhile fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r102292835 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -911,14 +916,14 @@ private[spark] class TaskSetManager( logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { val time = clock.getTimeMillis() - val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray - Arrays.sort(durations) + val durations = successfulTasksSet.toArray.map(taskInfos(_).duration) --- End diff -- there are some other things you could do to make the original code more efficient: a) instead of `.values.filter.map`, use a `foreach` to directly add into an `ArrayBuffer`. That will avoid all the intermediate collections that scala would create otherwise. b) store an approximate distribution of the runtimes, eg. using a tdigest. aside: what pain that there is no quick way to get the middle element of a TreeSet -- I couldn't find anything efficient in either the java or scala libs :( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16867 Jenkins, ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r102292628 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -137,6 +137,12 @@ private[spark] class TaskSetManager( // Task index, start and finish time for each task attempt (indexed by task ID) val taskInfos = new HashMap[Long, TaskInfo] + val successfulTasksSet = new scala.collection.mutable.TreeSet[Long] { --- End diff -- this can't be a set, since multiple tasks might be running for the same amount of time. you could change it to a set of `(time, count)` pairs, which would make the update logic a bit more complicated, or just keep a list of all runtimes. also the name should indicate that its the durations, eg. `successfulTaskDurations` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16978: [SPARK-19652][UI] Do auth checks for REST API access.
Github user squito commented on the issue: https://github.com/apache/spark/pull/16978 lgtm assuming tests pass --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16946: [SPARK-19554][UI,YARN] Allow SHS URL to be used f...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16946#discussion_r102546890 --- Diff: docs/running-on-yarn.md --- @@ -604,3 +604,17 @@ spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spn Finally, if the log level for `org.apache.spark.deploy.yarn.Client` is set to `DEBUG`, the log will include a list of all tokens obtained, and their expiry details + +## Using the Spark History Server to replace the Spark Web UI + +It is possible to use the Spark History Server application page as the tracking URL for running +applications in scenarios where it may be desired to disable the built-in application UI. Two steps --- End diff -- nit: first sentence reads a little funny. maybe rephrase to: It is possible to use the Spark History Server application page as the tracking URL for running applications where built-in application UI is disabled. This may be desirable on secure clusters or to avoid the memory usage on the driver from the UI. Up to you. maybe doens't even need the second sentence. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16946: [SPARK-19554][UI,YARN] Allow SHS URL to be used f...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16946#discussion_r102539475 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilterSuite.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.{PrintWriter, StringWriter} +import javax.servlet.FilterChain +import javax.servlet.http.{Cookie, HttpServletRequest, HttpServletResponse} + +import org.mockito.Mockito._ + +import org.apache.spark.SparkFunSuite + +class YarnProxyRedirectFilterSuite extends SparkFunSuite { + + test("redirect proxied requests, pass-through others") { +val requestURL = "http://example.com:1234/foo?"; +val filter = new YarnProxyRedirectFilter() +val cookies = Array(new Cookie(YarnProxyRedirectFilter.COOKIE_NAME, "dr.who")) + +val req = mock(classOf[HttpServletRequest]) +when(req.getCookies()).thenReturn(cookies, null) --- End diff -- I was really confused by this test at first -- I didn't know that `thenReturn` lets you specify multiple values for consecutive calls. For any one else as clueless as me, it would be helpful to drop in a comment here to draw attention to this, eg. "First request has cookies with a user name, second request does not". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16892: [SPARK-19560] Improve DAGScheduler tests.
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16892#discussion_r102996593 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2031,6 +2051,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou * In this test, we run a map stage where one of the executors fails but we still receive a * "zombie" complete message from that executor. We want to make sure the stage is not reported * as done until all tasks have completed. + * + * Most of the functionality in this test is tested in "run trivial shuffle with out-of-band + * executor failure and retry". However, that test uses ShuffleMapStages that are followed by + * a ResultStage, whereas in this test, the ShuffleMapStage is tested in isolation, without a + * ResultStage after it. --- End diff -- the *final* map-stage can't have anything that follows it, but the job overall can still have multiple stages, and the failure can occur during the processing of those earlier map-stages, or the final one. In any case, I agree you don't need to expand that test in this PR. and even though this test doesn't do as much as I was hoping, I do still think it adds value, and is worth leaving in, even though its very similar to the other test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16930: [SPARK-19597][CORE] test case for task deserialization e...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16930 thanks @kayousterhout ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage policies tes...
Github user squito commented on the issue: https://github.com/apache/spark/pull/15604 Jenkins, ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage polic...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15604#discussion_r94497574 --- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala --- @@ -178,4 +180,97 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { scheduleTaskAndVerifyId(2, rootPool, 6) scheduleTaskAndVerifyId(3, rootPool, 2) } + + test("SPARK-18066: FIFO Scheduler just uses root pool") { +sc = new SparkContext("local", "PoolSuite") +val taskScheduler = new TaskSchedulerImpl(sc) + +val rootPool = new Pool("", SchedulingMode.FIFO, initMinShare = 0, initWeight = 0) +val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) + +val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler) +val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler) + +val properties = new Properties() +properties.setProperty("spark.scheduler.pool", TEST_POOL) + +// FIFOSchedulableBuilder just uses rootPool so even if properties are set, related pool +// (testPool) is not created and TaskSetManagers are added to rootPool +schedulableBuilder.addTaskSetManager(taskSetManager0, properties) +schedulableBuilder.addTaskSetManager(taskSetManager1, properties) + +assert(rootPool.getSchedulableByName(TEST_POOL) == null) +assert(rootPool.schedulableQueue.size == 2) +assert(rootPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) +assert(rootPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) + } + + test("SPARK-18066: FAIR Scheduler uses default pool when spark.scheduler.pool property is not " + +"set") { +sc = new SparkContext("local", "PoolSuite") +val taskScheduler = new TaskSchedulerImpl(sc) + +val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0) +val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) +schedulableBuilder.buildPools() + +// FAIR Scheduler uses default pool when pool properties are null +val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler) + +schedulableBuilder.addTaskSetManager(taskSetManager0, null) + +val defaultPool = rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME) +assert(defaultPool != null) +assert(defaultPool.schedulableQueue.size == 1) +assert(defaultPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) + +// FAIR Scheduler uses default pool when spark.scheduler.pool property is not set +val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler) + +schedulableBuilder.addTaskSetManager(taskSetManager1, new Properties()) + +assert(defaultPool.schedulableQueue.size == 2) +assert(defaultPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) + +// FAIR Scheduler uses default pool when spark.scheduler.pool property is set as default pool --- End diff -- I'm all for more tests, but this final case here seems pretty unnecessary --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage polic...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15604#discussion_r94499675 --- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala --- @@ -178,4 +180,97 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { scheduleTaskAndVerifyId(2, rootPool, 6) scheduleTaskAndVerifyId(3, rootPool, 2) } + + test("SPARK-18066: FIFO Scheduler just uses root pool") { --- End diff -- super nit: the jira doesn't really say anything other than "add these tests", so not much point in listing it here, just name test "FIFO Scheduler just uses root pool". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage polic...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15604#discussion_r94499385 --- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala --- @@ -178,4 +180,97 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { scheduleTaskAndVerifyId(2, rootPool, 6) scheduleTaskAndVerifyId(3, rootPool, 2) } + + test("SPARK-18066: FIFO Scheduler just uses root pool") { +sc = new SparkContext("local", "PoolSuite") +val taskScheduler = new TaskSchedulerImpl(sc) + +val rootPool = new Pool("", SchedulingMode.FIFO, initMinShare = 0, initWeight = 0) +val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) + +val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler) +val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler) + +val properties = new Properties() +properties.setProperty("spark.scheduler.pool", TEST_POOL) + +// FIFOSchedulableBuilder just uses rootPool so even if properties are set, related pool +// (testPool) is not created and TaskSetManagers are added to rootPool +schedulableBuilder.addTaskSetManager(taskSetManager0, properties) +schedulableBuilder.addTaskSetManager(taskSetManager1, properties) + +assert(rootPool.getSchedulableByName(TEST_POOL) == null) +assert(rootPool.schedulableQueue.size == 2) +assert(rootPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) +assert(rootPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) + } + + test("SPARK-18066: FAIR Scheduler uses default pool when spark.scheduler.pool property is not " + +"set") { +sc = new SparkContext("local", "PoolSuite") +val taskScheduler = new TaskSchedulerImpl(sc) + +val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0) +val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) +schedulableBuilder.buildPools() + +// FAIR Scheduler uses default pool when pool properties are null +val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler) + +schedulableBuilder.addTaskSetManager(taskSetManager0, null) + +val defaultPool = rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME) +assert(defaultPool != null) +assert(defaultPool.schedulableQueue.size == 1) +assert(defaultPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) + +// FAIR Scheduler uses default pool when spark.scheduler.pool property is not set +val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler) + +schedulableBuilder.addTaskSetManager(taskSetManager1, new Properties()) + +assert(defaultPool.schedulableQueue.size == 2) +assert(defaultPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) + +// FAIR Scheduler uses default pool when spark.scheduler.pool property is set as default pool +val taskSetManager2 = createTaskSetManager(stageId = 2, numTasks = 1, taskScheduler) + +val properties = new Properties() +properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, schedulableBuilder + .DEFAULT_POOL_NAME) + +schedulableBuilder.addTaskSetManager(taskSetManager2, properties) + +assert(defaultPool.schedulableQueue.size == 3) +assert(defaultPool.getSchedulableByName(taskSetManager2.name) === taskSetManager2) + } + + test("SPARK-18066: FAIR Scheduler creates a new pool when spark.scheduler.pool property points " + +"non-existent") { --- End diff -- minor nit: rename to "FAIR Scheduler creates a new pool when spark.scheduler.pool property points to a non-existent pool" bigger question: @markhamstra @kayousterhout Is this really the desired behavior? Or is there a bug -- should it fail fast? It looks like [this behavior was intentional](https://github.com/apache/spark/blob/b67b35f76b684c5176dc683e7491fd01b43f4467/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala#L143-L145). Then again, that comment makes it sound like the user is going to directly modify the `weight` and `minShare` of the constructed pool, but though [they are `var`s](https://github.com/apache/spark/blob/b67b35f76b684c5176dc683e7491fd01b43f4467/core/src/main/scala/org/apache/spark/scheduler/Pool.scala#L40-L41), its `private [spark]` and there isn't anything else exposing it to the user. I prefer fail-fast behavior, but it see
[GitHub] spark pull request #15237: [SPARK-17663] [CORE] SchedulableBuilder should ha...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15237#discussion_r94501620 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala --- @@ -102,38 +103,52 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) for (poolNode <- (xml \\ POOLS_PROPERTY)) { val poolName = (poolNode \ POOL_NAME_PROPERTY).text - var schedulingMode = DEFAULT_SCHEDULING_MODE - var minShare = DEFAULT_MINIMUM_SHARE - var weight = DEFAULT_WEIGHT - - val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text - if (xmlSchedulingMode != "") { -try { - schedulingMode = SchedulingMode.withName(xmlSchedulingMode) -} catch { - case e: NoSuchElementException => -logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " + - s"using the default schedulingMode: $schedulingMode") -} - } - val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text - if (xmlMinShare != "") { -minShare = xmlMinShare.toInt - } + val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase + val schedulingMode = getSchedulingModeValue(xmlSchedulingMode, DEFAULT_SCHEDULING_MODE) - val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text - if (xmlWeight != "") { -weight = xmlWeight.toInt - } + val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text.trim + val minShare = getIntValue(MINIMUM_SHARES_PROPERTY, xmlMinShare, DEFAULT_MINIMUM_SHARE) + + val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text.trim + val weight = getIntValue(WEIGHT_PROPERTY, xmlWeight, DEFAULT_WEIGHT) + + rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight)) - val pool = new Pool(poolName, schedulingMode, minShare, weight) - rootPool.addSchedulable(pool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, schedulingMode, minShare, weight)) } } + private def getSchedulingModeValue(data: String, defaultValue: SchedulingMode): SchedulingMode = { +val warningMessage = s"Unsupported schedulingMode: $data, using the default schedulingMode: " + + s"$defaultValue" +try { + if (SchedulingMode.withName(data) != SchedulingMode.NONE) { +SchedulingMode.withName(data) + } else { +logWarning(warningMessage) +defaultValue + } +} catch { + case e: NoSuchElementException => +logWarning(warningMessage) +defaultValue +} + } + + private def getIntValue(propertyName: String, data: String, defaultValue: Int): Int = { +try { + data.toInt +} catch { + case e: NumberFormatException => +logWarning(s"Error while loading scheduler allocation file at $schedulerAllocFile. " + --- End diff -- ugh, this is kind of a nuisance, but I realize now that `schedulerAllocFile` isn't necessarily the right file -- that might be empty, and there might be a `fairscheduler.xml` sitting on the classpath. Can you get the right file name in both cases? (Better to leave it to not include any filename, than to And can the warning include the poolname as well? Finally, it would be nice to add this extra info to the mode warning too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15237: [SPARK-17663] [CORE] SchedulableBuilder should ha...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15237#discussion_r94501723 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala --- @@ -102,38 +103,52 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) for (poolNode <- (xml \\ POOLS_PROPERTY)) { val poolName = (poolNode \ POOL_NAME_PROPERTY).text - var schedulingMode = DEFAULT_SCHEDULING_MODE - var minShare = DEFAULT_MINIMUM_SHARE - var weight = DEFAULT_WEIGHT - - val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text - if (xmlSchedulingMode != "") { -try { - schedulingMode = SchedulingMode.withName(xmlSchedulingMode) -} catch { - case e: NoSuchElementException => -logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " + - s"using the default schedulingMode: $schedulingMode") -} - } - val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text - if (xmlMinShare != "") { -minShare = xmlMinShare.toInt - } + val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase + val schedulingMode = getSchedulingModeValue(xmlSchedulingMode, DEFAULT_SCHEDULING_MODE) --- End diff -- this is minor, but if you're going to have a helper method here, can you do *all* the parsing inside it? include the one line above `(poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase`. Same goes for `getIntValue` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15237: [SPARK-17663] [CORE] SchedulableBuilder should ha...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15237#discussion_r94500060 --- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala --- @@ -178,4 +176,39 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { scheduleTaskAndVerifyId(2, rootPool, 6) scheduleTaskAndVerifyId(3, rootPool, 2) } + + test("SPARK-17663: FairSchedulableBuilder sets default values for blank or invalid datas") { +val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml") + .getFile() +val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath) + +val rootPool = new Pool("", FAIR, 0, 0) +val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf) +schedulableBuilder.buildPools() + +verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO) +verifyPool(rootPool, "pool_with_invalid_min_share", 0, 2, FAIR) +verifyPool(rootPool, "pool_with_invalid_weight", 1, 1, FAIR) +verifyPool(rootPool, "pool_with_invalid_scheduling_mode", 3, 2, FIFO) +verifyPool(rootPool, "pool_with_non_uppercase_scheduling_mode", 2, 1, FAIR) +verifyPool(rootPool, "pool_with_NONE_scheduling_mode", 1, 2, FIFO) +verifyPool(rootPool, "pool_with_whitespace_min_share", 0, 2, FAIR) +verifyPool(rootPool, "pool_with_whitespace_weight", 1, 1, FAIR) +verifyPool(rootPool, "pool_with_whitespace_scheduling_mode", 3, 2, FIFO) +verifyPool(rootPool, "pool_with_empty_min_share", 0, 3, FAIR) +verifyPool(rootPool, "pool_with_empty_weight", 2, 1, FAIR) +verifyPool(rootPool, "pool_with_empty_scheduling_mode", 2, 2, FIFO) +verifyPool(rootPool, "pool_with_min_share_surrounded_whitespace", 3, 2, FAIR) +verifyPool(rootPool, "pool_with_weight_surrounded_whitespace", 1, 2, FAIR) +verifyPool(rootPool, "pool_with_scheduling_mode_surrounded_whitespace", 3, 2, FAIR) --- End diff -- nit: can you just combine all these "surrounded_whitespace" cases into one for all the properties? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage policies tes...
Github user squito commented on the issue: https://github.com/apache/spark/pull/15604 hmm, so I'm thinking about this again, along with your other pr https://github.com/apache/spark/pull/15237 and it got me thinking -- there also aren't any tests for a mix of FIFO and FAIR (at least, I don't see it in "Nested Pool Test" in PoolSuite). While you are doing this, would you like to add some tests for that as well? (to be perfectly honest, I need to spend a little bit of time thinking about what that behavior should be anyway, but I will have to do that a little later.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage polic...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15604#discussion_r94670256 --- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala --- @@ -178,4 +180,97 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { scheduleTaskAndVerifyId(2, rootPool, 6) scheduleTaskAndVerifyId(3, rootPool, 2) } + + test("SPARK-18066: FIFO Scheduler just uses root pool") { +sc = new SparkContext("local", "PoolSuite") +val taskScheduler = new TaskSchedulerImpl(sc) + +val rootPool = new Pool("", SchedulingMode.FIFO, initMinShare = 0, initWeight = 0) +val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) + +val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler) +val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler) + +val properties = new Properties() +properties.setProperty("spark.scheduler.pool", TEST_POOL) + +// FIFOSchedulableBuilder just uses rootPool so even if properties are set, related pool +// (testPool) is not created and TaskSetManagers are added to rootPool +schedulableBuilder.addTaskSetManager(taskSetManager0, properties) +schedulableBuilder.addTaskSetManager(taskSetManager1, properties) + +assert(rootPool.getSchedulableByName(TEST_POOL) == null) +assert(rootPool.schedulableQueue.size == 2) +assert(rootPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) +assert(rootPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) + } + + test("SPARK-18066: FAIR Scheduler uses default pool when spark.scheduler.pool property is not " + +"set") { +sc = new SparkContext("local", "PoolSuite") +val taskScheduler = new TaskSchedulerImpl(sc) + +val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0) +val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) +schedulableBuilder.buildPools() + +// FAIR Scheduler uses default pool when pool properties are null +val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler) + +schedulableBuilder.addTaskSetManager(taskSetManager0, null) + +val defaultPool = rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME) +assert(defaultPool != null) +assert(defaultPool.schedulableQueue.size == 1) +assert(defaultPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) + +// FAIR Scheduler uses default pool when spark.scheduler.pool property is not set +val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler) + +schedulableBuilder.addTaskSetManager(taskSetManager1, new Properties()) + +assert(defaultPool.schedulableQueue.size == 2) +assert(defaultPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) + +// FAIR Scheduler uses default pool when spark.scheduler.pool property is set as default pool +val taskSetManager2 = createTaskSetManager(stageId = 2, numTasks = 1, taskScheduler) + +val properties = new Properties() +properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, schedulableBuilder + .DEFAULT_POOL_NAME) + +schedulableBuilder.addTaskSetManager(taskSetManager2, properties) + +assert(defaultPool.schedulableQueue.size == 3) +assert(defaultPool.getSchedulableByName(taskSetManager2.name) === taskSetManager2) + } + + test("SPARK-18066: FAIR Scheduler creates a new pool when spark.scheduler.pool property points " + +"non-existent") { --- End diff -- yeah I prefer failing fast for the same reason. But I guess we can't actually change this, since there is a valid use case for this behavior. We don't really know if others are using this, and I don't think we should change it under them in the next release. In any case, I still think we should also change `Pool.weight` and `Pool.minShare` to `val`s. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16376: [SPARK-18967][SCHEDULER] compute locality levels even if...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16376 Thanks for the feedback. I've updated the comment on `myLocalityLevels`. Also I updated the tests slightly. To ensure that we're really testing no delay, I updated the tests to use a manual clock which never advances. And also to address @lirui-intel 's concern, i also added a test that we still schedule at non-preferred locations immediately with delay scheduling off. (Somewhat obvious in current implementation since we always add `ANY` but I think its a good regression test if nothing else.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16376: [SPARK-18967][SCHEDULER] compute locality levels ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16376#discussion_r94835320 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -159,7 +159,12 @@ private[spark] class TaskSetManager( addPendingTask(i) } - // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling + /** + * Track the set of locality levels which are valid given the tasks locality preferences and + * the set of currently available executors. This is updated as executors are added and removed. + * This allows a performance optimization, of skipping levels that aren't relevant (eg., skip + * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors). + */ var myLocalityLevels = computeValidLocalityLevels() --- End diff -- As I was figuring out the purpose of this for what to put in the comment, I made a couple of observations: 1) For each executor we add or remove, its an O(numExecutors) operation to update the locality levels. So overall its an O(numExecutors^2) to add a bunch. Minor on small clusters, but I wonder if this is an issue when you're using dynamic allocation and going up and down to 1000s of executors. Its all happening with a lock on the `TaskSchedulerImpl` too. 2) Though we recompute valid locality levels as executors come and go, we do *not* as tasks complete. That's not a problem -- as offers come in, we still go through the right task lists. But it does make me wonder whether this business of updating the locality levels for the current set of executors is useful, and instead we should just always use all levels. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16053: [SPARK-17931] Eliminate unncessary task (de) serializati...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16053 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16053: [SPARK-17931] Eliminate unncessary task (de) serializati...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16053 lgtm, assuming tests pass --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16053: [SPARK-17931] Eliminate unncessary task (de) serializati...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16053 merged to master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16489: [SPARK-18836][MINOR] remove obsolete param doc fo...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/16489 [SPARK-18836][MINOR] remove obsolete param doc for metrics ## What changes were proposed in this pull request? 4cb49412d1d7d10ffcc738475928c7de2bc59fd4 removed the `metrics` parameter from Task, but left it in the docs -- this removes it from the docs as well. (The original commit removed it from the docs for ResultTask and ShuffleMapTask, but not the parent Task.) ## How was this patch tested? jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-18836_doc_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16489.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16489 commit 974921dca1a74fcb4fff4fdc6c3bdba314f681ec Author: Imran Rashid Date: 2017-01-06T17:15:03Z remove obsolete param doc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16489: [SPARK-18836][MINOR] remove obsolete param doc for metri...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16489 @shivaram @kayousterhout super minor follow up to https://github.com/apache/spark/pull/16261 I just noticed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16489: [SPARK-18836][MINOR] remove obsolete param doc for metri...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16489 oh thanks @srowen , I didn't know it could do that. I'll update with a larger list of fixes then --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16489: [SPARK-18836][MINOR] remove obsolete param doc fo...
Github user squito closed the pull request at: https://github.com/apache/spark/pull/16489 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16376: [SPARK-18967][SCHEDULER] compute locality levels even if...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16376 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95198208 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,36 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer, +private var task_ : Task[_] = null) extends Logging { --- End diff -- rather than having `null` be a default, I think it might be better to pass it explicitly in `decode`, where you can also comment on why you do that, rather than do the deserialization immediately. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95198044 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,36 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer, +private var task_ : Task[_] = null) extends Logging { + + def this( + taskId: Long, + attemptNumber: Int, + executorId: String, + name: String, + index: Int, // Index within this task's TaskSet + addedFiles: Map[String, Long], + addedJars: Map[String, Long], + properties: Properties, + task: Task[_]) { +this(taskId, attemptNumber, executorId, name, index, + addedFiles, addedJars, properties, null, task) + } + + lazy val serializedTask: ByteBuffer = { +if (serializedTask_ == null) { + serializedTask_ = try { +ByteBuffer.wrap(Utils.serialize(task_)) + } catch { +case NonFatal(e) => + val msg = s"Failed to serialize task $taskId, not attempting to retry it." + logError(msg, e) + throw new TaskNotSerializableException(e) + } +} +serializedTask_ --- End diff -- can you include a comment here about the purpose of this? Maybe just inside the `if (serialized_ == null)`, something like "This is where we serialize the task on the driver before sending it to the executor. This is not done when creating the TaskDescription so we can postpone this serialization to later in the scheduling process -- particularly, so it can happen in another thread by the CoarseGrainedSchedulerBackend. On the executors, this will already be populated by `decode`". Also, I feel like this would be more clear as a `def`, rather than a `lazy val`. Since its a lazy val I keep thinking this itself is going to be serialized, and then I have to remind myself its just used by encode and the serialization is manually managed. The only advantage of `lazy val` is automatic thread-safety, but that's irrelevant here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95198725 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -149,7 +149,12 @@ private[spark] object Utils extends Logging { /** Deserialize an object using Java serialization and the given ClassLoader */ def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = { -val bis = new ByteArrayInputStream(bytes) +deserialize(ByteBuffer.wrap(bytes), loader) + } + + /** Deserialize an object using Java serialization and the given ClassLoader */ + def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { +val bis = new ByteBufferInputStream(bytes) --- End diff -- for other reviewers: I was a little worried that maybe this was (ever-so-slightly) changing a really important code path, but looks like this method actually is barely used, so not a big deal. (`SparkContext.objectFile` and `ReceivedBlockTracker.cleanupBatches`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95199529 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -592,47 +579,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1) } - test("do not emit warning when serialized task is small") { --- End diff -- This may be a stretch, but `Task` *could* be big, right? Eg. if there was a long chain of partitions, and for some reason they had a lot of data? That seems unlikely, but I also have no idea if it ever happens or not. It seems easy enough to add the check back in -- is there any reason not to? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16376: [SPARK-18967][SCHEDULER] compute locality levels ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16376#discussion_r95419719 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -54,7 +54,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} private[spark] class TaskSchedulerImpl private[scheduler]( val sc: SparkContext, val maxTaskFailures: Int, -blacklistTrackerOpt: Option[BlacklistTracker], +val blacklistTrackerOpt: Option[BlacklistTracker], --- End diff -- oh good point, this change is not necessary (must have been part of another change which I later reverted, sorry) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user squito commented on the issue: https://github.com/apache/spark/pull/15505 @witgo -- right, I'm saying that we should make local mode work to make debugging & development as easy as possible, not to maximize performance. That means we *should* do the extra serialization in local mode, to mimic the behavior of running on a cluster. It doesn't look like task serialization still happens in local mode to me. Tasks only get serialized inside `TaskDescription.encode`, but that is only called by `CoarseGrainedSchedulerBackend`.Before this change, task serialization happened in [`localScheduler.reviveOffers`](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala#L85), because it happened inside the `TaskSchedulerImpl`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95423240 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -149,7 +149,12 @@ private[spark] object Utils extends Logging { /** Deserialize an object using Java serialization and the given ClassLoader */ def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = { -val bis = new ByteArrayInputStream(bytes) +deserialize(ByteBuffer.wrap(bytes), loader) + } + + /** Deserialize an object using Java serialization and the given ClassLoader */ + def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { +val bis = new ByteBufferInputStream(bytes) --- End diff -- I think you misunderstood my comment. I agree, this change is fine. Its correctness is clear, and definitely covered by tests. My initial concern was that this might be some super-common code path, and so even a slight modification may effect performance (should be optimized away by the jit anyway but thought it was worth thinking about). Anyway, first of all it was a really minor concern, and second of all it turned out to be totally irrelevant since its *not* a commonly used method. I was just trying to help out any other reviewers from also trying to track this down. Sorry for the confusion :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16376: [SPARK-18967][SCHEDULER] compute locality levels ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16376#discussion_r95424970 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -54,7 +54,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} private[spark] class TaskSchedulerImpl private[scheduler]( val sc: SparkContext, val maxTaskFailures: Int, -blacklistTrackerOpt: Option[BlacklistTracker], +val blacklistTrackerOpt: Option[BlacklistTracker], --- End diff -- doh, actually this is used in tests. I override `createTaskSetManager` to use my manual clock. Another alternative is to allow the clock to get passed in to the `TaskSCheudlerImpl` constructor, but that ends up being a bigger code change. I'll make it `private[scheduler] val`, slightly tighter than the implicit `private[spark]` it would be otherwise. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95433750 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,36 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer, +private var task_ : Task[_] = null) extends Logging { --- End diff -- @kayousterhout I like that idea, but I took a brief look at how much would have to change, and its a ton of changes, since now `TaskSchedulerImpl.resourceOffers` would be returning `TaskDescription`s and `Task`s, which then leads to small changes in a lot more places. I'm torn, but inclined to agree with @witgo --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16376: [SPARK-18967][SCHEDULER] compute locality levels ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16376#discussion_r95446452 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -819,4 +819,84 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!taskScheduler.hasExecutorsAliveOnHost("host0")) assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty) } + + test("Locality should be used for bulk offers even with delay scheduling off") { +// for testing, we create a task scheduler which lets us control how offers are shuffled +val conf = new SparkConf() + .set("spark.locality.wait", "0") +sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) +// we create a manual clock just so we can be sure the clock doesn't advance at all in this test +val clock = new ManualClock() + +// We customize the task scheduler just to let us control the way offers are shuffled, so we +// can be sure we try both permutations, and to control the clock on the tasksetmanager. +val taskScheduler = new TaskSchedulerImpl(sc) { + override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { +// Don't shuffle the offers around for this test. Instead, we'll just pass in all +// the permutations we care about directly. +offers + } + override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { +new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock) + } +} +// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. +new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} +} +taskScheduler.initialize(new FakeSchedulerBackend) + + +// Make two different offers -- one in the preferred location, one that is not. +val offers = IndexedSeq( + WorkerOffer("exec1", "host1", 1), + WorkerOffer("exec2", "host2", 1) +) +Seq(false, true).foreach { swapOrder => + // Submit a taskset with locality preferences. + val taskSet = FakeTask.createTaskSet( +1, stageId = 1, stageAttemptId = 0, Seq(TaskLocation("host1", "exec1"))) + taskScheduler.submitTasks(taskSet) + val shuffledOffers = if (swapOrder) offers.reverse else offers + // Regardless of the order of the offers (after the task scheduler shuffles them), we should + // always take advantage of the local offer. + val taskDescs = taskScheduler.resourceOffers(shuffledOffers).flatten + withClue(s"swapOrder = $swapOrder") { +assert(taskDescs.size === 1) +assert(taskDescs.head.executorId === "exec1") + } +} + } + + test("With delay scheduling off, tasks can be run at any locality level immediately") { --- End diff -- yes, you are absolutely right. I've updated and also added a check to make sure tsm includes lower locality levels. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95448131 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,36 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer, +private var task_ : Task[_] = null) extends Logging { --- End diff -- yeah, that leads to ~100 compile errors in test code. its all small changes but still, not sure if there is much value. I don't understand the comment on LaunchTask -- your commit does not change it, right? in any case, if you feel like we should do that refactor, I am not opposed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16376: [SPARK-18967][SCHEDULER] compute locality levels even if...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16376 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16521: [SPARK-19139][core] New auth mechanism for transp...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16521#discussion_r95625631 --- Diff: common/network-common/src/main/java/org/apache/spark/network/crypto/README.md --- @@ -0,0 +1,158 @@ +Spark Auth Protocol and AES Encryption Support +== + +This file describes an auth protocol used by Spark as a more secure alternative to DIGEST-MD5. This +protocol is built on symmetric key encryption, based on the assumption that the two endpoints being +authenticated share a common secret, which is how Spark authentication currently works. The protocol +provides mutual authentication, meaning that after the negotiation both parties know that the remote +side knows the shared secret. The protocol is influenced by the ISO/IEC 9798 protocol, although it's +not an implementation of it. + +This protocol could be replaced with TLS PSK, except no PSK ciphers are available in the currently +released JREs. + +The protocol aims at solving the following shortcomings in Spark's current usage of DIGEST-MD5: + +- MD5 is an aging hash algorithm with known weaknesses, and a more secure alternative is desired. +- DIGEST-MD5 has a pre-defined set of ciphers for which it can generate keys. The only + viable, supported cipher these days is 3DES, and a more modern alternative is desired. +- Encrypting AES session keys with 3DES doesn't solve the issue, since the weakest link + in the negotiation would still be MD5 and 3DES. + +The protocol assumes that the shared secret is generated and distributed in a secure manner. + +The protocol always negotiates encryption keys. If encryption is not desired, the existing +SASL-based authentication, or no authentication at all, can be chosen instead. + +When messages are described below, it's expected that the implementation should support +arbitrary sizes for fields that don't have a fixed size. + +Client Challenge + + +The auth negotiation is started by the client. The client starts by generating an encryption +key based on the application's shared secret, and a nonce. + +KEY = KDF(SECRET, SALT, KEY_LENGTH) + +Where: +- KDF(): a key derivation function that takes a secret, a salt, a configurable number of + iterations, and a configurable key length. +- SALT: a byte sequence used to salt the key derivation function. +- KEY_LENGTH: length of the encryption key to generate. + + +The client generates a message with the following content: + +CLIENT_CHALLENGE = ( +APP_ID, +KDF, +ITERATIONS, +CIPHER, +KEY_LENGTH, +ANONCE, +ENC(APP_ID || ANONCE || CHALLENGE)) + +Where: + +- APP_ID: the application ID which the server uses to identify the shared secret. +- KDF: the key derivation function described above. +- ITERATIONS: number of iterations to run the KDF when generating keys. +- CIPHER: the cipher used to encrypt data. +- KEY_LENGTH: length of the encryption keys to generate, in bits. +- ANONCE: the nonce used as the salt when generating the auth key. +- ENC(): an encryption function that uses the cipher and the generated key. This function + will also be used in the definition of other messages below. +- CCHALLENGE: a byte sequence used as a challenge to the server. +- ||: concatenation operator. + +When strings are used where byte arrays are expected, the UTF-8 representation of the string +is assumed. + +To respond to the challenge, the server should consider the byte array as representing an +arbitrary-length integer, and respond with the value of the integer plus one. + + +Server Response And Challenge +- + +Once the client challenge is received, the server will generate the same auth key by +using the same algorithm the client has used. It will then verify the client challenge: +if the APP_ID and ANONCE fields match, the server knows that the client has the shared +secret. The server then creates a response to the client challenge, to prove that it also +has the secret key, and provides parameters to be used when creating the session key. + +The following describes the response from the server: + +SERVER_CHALLENGE = ( +ENC(APP_ID || ANONCE || RESPONSE), +ENC(SNONCE), +ENC(INIV), +ENC(OUTIV)) + +Where: + +- CRESPONSE: the server's response to the client challenge. +- SNONCE: a nonce to be used as salt when generating the session key. +- INIV: initialization vector used to initialize the input c
[GitHub] spark pull request #16521: [SPARK-19139][core] New auth mechanism for transp...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16521#discussion_r95625434 --- Diff: common/network-common/src/main/java/org/apache/spark/network/crypto/README.md --- @@ -0,0 +1,158 @@ +Spark Auth Protocol and AES Encryption Support +== + +This file describes an auth protocol used by Spark as a more secure alternative to DIGEST-MD5. This +protocol is built on symmetric key encryption, based on the assumption that the two endpoints being +authenticated share a common secret, which is how Spark authentication currently works. The protocol +provides mutual authentication, meaning that after the negotiation both parties know that the remote +side knows the shared secret. The protocol is influenced by the ISO/IEC 9798 protocol, although it's +not an implementation of it. + +This protocol could be replaced with TLS PSK, except no PSK ciphers are available in the currently +released JREs. + +The protocol aims at solving the following shortcomings in Spark's current usage of DIGEST-MD5: + +- MD5 is an aging hash algorithm with known weaknesses, and a more secure alternative is desired. +- DIGEST-MD5 has a pre-defined set of ciphers for which it can generate keys. The only + viable, supported cipher these days is 3DES, and a more modern alternative is desired. +- Encrypting AES session keys with 3DES doesn't solve the issue, since the weakest link + in the negotiation would still be MD5 and 3DES. + +The protocol assumes that the shared secret is generated and distributed in a secure manner. + +The protocol always negotiates encryption keys. If encryption is not desired, the existing +SASL-based authentication, or no authentication at all, can be chosen instead. + +When messages are described below, it's expected that the implementation should support +arbitrary sizes for fields that don't have a fixed size. + +Client Challenge + + +The auth negotiation is started by the client. The client starts by generating an encryption +key based on the application's shared secret, and a nonce. + +KEY = KDF(SECRET, SALT, KEY_LENGTH) + +Where: +- KDF(): a key derivation function that takes a secret, a salt, a configurable number of + iterations, and a configurable key length. +- SALT: a byte sequence used to salt the key derivation function. +- KEY_LENGTH: length of the encryption key to generate. + + +The client generates a message with the following content: + +CLIENT_CHALLENGE = ( +APP_ID, +KDF, +ITERATIONS, +CIPHER, +KEY_LENGTH, +ANONCE, +ENC(APP_ID || ANONCE || CHALLENGE)) + +Where: + +- APP_ID: the application ID which the server uses to identify the shared secret. +- KDF: the key derivation function described above. +- ITERATIONS: number of iterations to run the KDF when generating keys. +- CIPHER: the cipher used to encrypt data. +- KEY_LENGTH: length of the encryption keys to generate, in bits. +- ANONCE: the nonce used as the salt when generating the auth key. +- ENC(): an encryption function that uses the cipher and the generated key. This function + will also be used in the definition of other messages below. +- CCHALLENGE: a byte sequence used as a challenge to the server. +- ||: concatenation operator. + +When strings are used where byte arrays are expected, the UTF-8 representation of the string +is assumed. + +To respond to the challenge, the server should consider the byte array as representing an +arbitrary-length integer, and respond with the value of the integer plus one. + + +Server Response And Challenge +- + +Once the client challenge is received, the server will generate the same auth key by +using the same algorithm the client has used. It will then verify the client challenge: +if the APP_ID and ANONCE fields match, the server knows that the client has the shared +secret. The server then creates a response to the client challenge, to prove that it also +has the secret key, and provides parameters to be used when creating the session key. + +The following describes the response from the server: + +SERVER_CHALLENGE = ( +ENC(APP_ID || ANONCE || RESPONSE), +ENC(SNONCE), +ENC(INIV), +ENC(OUTIV)) + +Where: + +- CRESPONSE: the server's response to the client challenge. --- End diff -- typo: RESPONSE --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as
[GitHub] spark pull request #16521: [SPARK-19139][core] New auth mechanism for transp...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16521#discussion_r95624898 --- Diff: common/network-common/src/main/java/org/apache/spark/network/crypto/README.md --- @@ -0,0 +1,158 @@ +Spark Auth Protocol and AES Encryption Support +== + +This file describes an auth protocol used by Spark as a more secure alternative to DIGEST-MD5. This +protocol is built on symmetric key encryption, based on the assumption that the two endpoints being +authenticated share a common secret, which is how Spark authentication currently works. The protocol +provides mutual authentication, meaning that after the negotiation both parties know that the remote +side knows the shared secret. The protocol is influenced by the ISO/IEC 9798 protocol, although it's +not an implementation of it. + +This protocol could be replaced with TLS PSK, except no PSK ciphers are available in the currently +released JREs. + +The protocol aims at solving the following shortcomings in Spark's current usage of DIGEST-MD5: + +- MD5 is an aging hash algorithm with known weaknesses, and a more secure alternative is desired. +- DIGEST-MD5 has a pre-defined set of ciphers for which it can generate keys. The only + viable, supported cipher these days is 3DES, and a more modern alternative is desired. +- Encrypting AES session keys with 3DES doesn't solve the issue, since the weakest link + in the negotiation would still be MD5 and 3DES. + +The protocol assumes that the shared secret is generated and distributed in a secure manner. + +The protocol always negotiates encryption keys. If encryption is not desired, the existing +SASL-based authentication, or no authentication at all, can be chosen instead. + +When messages are described below, it's expected that the implementation should support +arbitrary sizes for fields that don't have a fixed size. + +Client Challenge + + +The auth negotiation is started by the client. The client starts by generating an encryption +key based on the application's shared secret, and a nonce. + +KEY = KDF(SECRET, SALT, KEY_LENGTH) + +Where: +- KDF(): a key derivation function that takes a secret, a salt, a configurable number of + iterations, and a configurable key length. +- SALT: a byte sequence used to salt the key derivation function. +- KEY_LENGTH: length of the encryption key to generate. + + +The client generates a message with the following content: + +CLIENT_CHALLENGE = ( +APP_ID, +KDF, +ITERATIONS, +CIPHER, +KEY_LENGTH, +ANONCE, +ENC(APP_ID || ANONCE || CHALLENGE)) + +Where: + +- APP_ID: the application ID which the server uses to identify the shared secret. +- KDF: the key derivation function described above. +- ITERATIONS: number of iterations to run the KDF when generating keys. +- CIPHER: the cipher used to encrypt data. +- KEY_LENGTH: length of the encryption keys to generate, in bits. +- ANONCE: the nonce used as the salt when generating the auth key. +- ENC(): an encryption function that uses the cipher and the generated key. This function + will also be used in the definition of other messages below. +- CCHALLENGE: a byte sequence used as a challenge to the server. --- End diff -- typo: CHALLENGE --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16521: [SPARK-19139][core] New auth mechanism for transp...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16521#discussion_r95661186 --- Diff: docs/configuration.md --- @@ -1625,40 +1625,40 @@ Apart from these, the following properties are also available, and may be useful - spark.authenticate.enableSaslEncryption + spark.network.crypto.enabled false -Enable encrypted communication when authentication is -enabled. This is supported by the block transfer service and the -RPC endpoints. +Enable encryption using the commons-crypto library for RPC and block transfer service. +Requires spark.authenticate to be enabled. --- End diff -- if `spark.authenticate=false`, what happens if this is `true`? It looks like it is just ignored, I think fail-fast would be ideal. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16346: [SPARK-16654][CORE] Add UI coverage for Applicati...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16346#discussion_r95725456 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -88,6 +88,86 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M new TaskSetBlacklist(conf, stageId, clock) } + def configureBlacklistAndScheduler(confs: (String, String)*): Unit = { +conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.BLACKLIST_ENABLED.key, "true") +confs.foreach { case (k, v) => conf.set(k, v) } + +clock.setTime(0) +listenerBusMock = mock[LiveListenerBus] +blacklist = new BlacklistTracker(listenerBusMock, conf, clock) + } + + test("Blacklisting individual tasks and checking for SparkListenerEvents") { +configureBlacklistAndScheduler() --- End diff -- I think the rest of the test code here has changed a lot since you first started working on this, but I'd avoid putting in a whole test case just for this. Instead, I'd look at the existing test cases, find all occurrences of `blacklist.isNodeBlacklisted` and `blacklist.isExecutorBlacklisted`, and add appropriate `verify(listenerBusMock)` calls around all of those. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16346: [SPARK-16654][CORE] Add UI coverage for Applicati...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16346#discussion_r95724900 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -88,6 +88,86 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M new TaskSetBlacklist(conf, stageId, clock) } + def configureBlacklistAndScheduler(confs: (String, String)*): Unit = { --- End diff -- since you're never passing in `confs`, couldn't you get rid of this method, and just change the initialization in `beforeEach` to ``` listenerBusMock = mock[LiveListenerBus] blacklist = new BlacklistTracker(listenerBusMock, conf, clock) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16346: [SPARK-16654][CORE] Add UI coverage for Applicati...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16346#discussion_r95724311 --- Diff: core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala --- @@ -157,4 +158,42 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar } } + private def updateExecutorBlacklist(eid: String, isBlacklisted: Boolean): Unit = { +val execTaskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) +execTaskSummary.isBlacklisted = isBlacklisted + } + + override def onExecutorBlacklisted(executorBlacklisted: SparkListenerExecutorBlacklisted) + : Unit = synchronized { --- End diff -- nit: move param to second line, and double-indent. (and the other added methods in this file) ```scala override def onExecutorBlacklisted( executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = synchronized { ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16346: [SPARK-16654][CORE] Add UI coverage for Application Leve...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16346 cc @ajbozarth as you were interested in this earlier --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16346: [SPARK-16654][CORE] Add UI coverage for Application Leve...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16346 @ajbozarth whoops, I hadn't noticed the extra "Blacklisted" column in the summary table at the top -- I was thinking we'd add another row for blacklisted executors. But I actually think the current version is better. So ignore that comment :) (mentioned the same to @jsoltren directly) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r96080614 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,43 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer) extends Logging { --- End diff -- ah, that last one is doing the full refactor and fixing the 100+ compile errors :) Here is yet another version, I think more like what Kay was suggesting: https://github.com/squito/spark/commit/389fec55de99e9f12104a8ba2b7aa54278aadd93 I agree with Kay that the `serializedTask` member var is a little confusing in the version in this pr now. No strong preference between the alternatives; the full separation is cleanest, but the change is pretty big (from a quick glance, I'm not sure all those test changes are correct, but I didn't look closely). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r96082369 --- Diff: core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala --- @@ -17,11 +17,37 @@ package org.apache.spark.scheduler -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} +import java.io.{IOException, NotSerializableException, ObjectInputStream, ObjectOutputStream} + +import org.apache.spark._ +import org.apache.spark.rdd.RDD import org.apache.spark.util.{RpcUtils, SerializableBuffer} -class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext { +class NotSerializablePartitionRDD( + sc: SparkContext, + numPartitions: Int) extends RDD[(Int, Int)](sc, Nil) with Serializable { --- End diff -- nit: double indent params --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r96082959 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { -val serializedTask = TaskDescription.encode(task) -if (serializedTask.limit >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => -try { - var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + -"spark.rpc.message.maxSize (%d bytes). Consider increasing " + -"spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) - taskSetMgr.abort(msg) -} catch { - case e: Exception => logError("Exception in error callback", e) -} - } +val serializedTask = try { + TaskDescription.encode(task) +} catch { + case NonFatal(e) => +abortTaskSetManager(scheduler, task.taskId, + s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) +null } -else { + +if (serializedTask != null && serializedTask.limit >= maxRpcMessageSize) { + val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + +"spark.rpc.message.maxSize (%d bytes). Consider increasing " + +"spark.rpc.message.maxSize or using broadcast variables for large values." + abortTaskSetManager(scheduler, task.taskId, +msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) +} else if (serializedTask != null) { + if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { --- End diff -- this nested if can be combined into the `else if`, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r96083494 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { -val serializedTask = TaskDescription.encode(task) -if (serializedTask.limit >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => -try { - var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + -"spark.rpc.message.maxSize (%d bytes). Consider increasing " + -"spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) - taskSetMgr.abort(msg) -} catch { - case e: Exception => logError("Exception in error callback", e) -} - } +val serializedTask = try { + TaskDescription.encode(task) +} catch { + case NonFatal(e) => +abortTaskSetManager(scheduler, task.taskId, --- End diff -- hmm, what happens here is *one* of the tasks can't be serialized (or is too big etc.). We'll abort the taskset, but wont' we still send out the `LaunchTask` events for all the other tasks? That wouldn't happen before. This may actually be a big problem -- we might lose the performance benefits if you first have to create all the serialized tasks to make sure they all work, and then send all the msgs. Maybe its ok to still have previous tasks start, but seems like we should at least prevent any *more* tasks from starting, or continuing to try to serialize every other task. I just modified the test case in `CoarseGrainedSchedulerBackendSuite` so it waits for multiple executors to come up before submitting the bad job, and the logs do show 2 instances of ``` 17/01/13 16:12:03.213 dispatcher-event-loop-3 ERROR TaskDescription: Failed to serialize task 2, not attempting to retry it. java.io.NotSerializableException at org.apache.spark.scheduler.NotSerializablePartitionRDD$$anonfun$getPartitions$1$$anon$1.writeObject(CoarseGrainedSchedulerBackendSuite.scala:38) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r96082685 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -517,6 +518,32 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } + test("unserializable partition") { +val shuffleMapRdd = new MyRDD(sc, 2, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, new Partitioner { + override def numPartitions = 1 + + override def getPartition(key: Any) = 1 + + @throws(classOf[IOException]) + private def writeObject(out: ObjectOutputStream): Unit = { +throw new NotSerializableException() + } + + @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream): Unit = {} +}) + +// Submit a map stage by itself +submitMapStage(shuffleDep) +assert(failure.getMessage.startsWith( + "Job aborted due to stage failure: Task not serializable")) +sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) +assert(sparkListener.failedStages.contains(0)) +assert(sparkListener.failedStages.size === 1) +assertDataStructuresEmpty() --- End diff -- I don't understand, what do you mean by "see above"? if you are referring to the "unserializable task" test -- good point, that test should be fixed too :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r96082178 --- Diff: core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala --- @@ -38,4 +64,17 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo assert(smaller.size === 4) } + test("Scheduler aborts stages that have unserializable partition") { +val conf = new SparkConf() + .setMaster("local-cluster[2, 1, 1024]") + .setAppName("test") + .set("spark.dynamicAllocation.testing", "true") +sc = new SparkContext(conf) +val myRDD = new NotSerializablePartitionRDD(sc, 2) +val e = intercept[SparkException] { + myRDD.count() +} +assert(e.getMessage.contains("Failed to serialize task")) + --- End diff -- you should add a check here that after this, you can still submit OK tasks. eg. just `sc.parallelize(1 to 10).count()` (similar to kay's comment about dagschedulersuite). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r96082398 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -602,6 +616,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp Future.successful(false) } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + // abort TaskSetManager without exception + def abortTaskSetManager( +scheduler: TaskSchedulerImpl, +taskId: Long, +msg: => String, +exception: Option[Throwable] = None): Unit = { --- End diff -- nit: double indent params --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r96105066 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,43 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer) extends Logging { --- End diff -- @witgo yes, feel free to pull that change into this pr --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81859662 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[scheduler] object BlacklistTracker extends Logging { + + private val DEFAULT_TIMEOUT = "1h" + + /** + * Returns true if the blacklist is enabled, based on checking the configuration in the following + * order: + * 1. Is it specifically enabled or disabled? + * 2. Is it enabled via the legacy timeout conf? + * 3. Use the default for the spark-master: + * - off for local mode + * - on for distributed modes (including local-cluster) + */ + def isBlacklistEnabled(conf: SparkConf): Boolean = { +conf.get(config.BLACKLIST_ENABLED) match { + case Some(isEnabled) => +isEnabled + case None => +// if they've got a non-zero setting for the legacy conf, always enable the blacklist, +// otherwise, use the default based on the cluster-mode (off for local-mode, on otherwise). +val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key +conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF) match { + case Some(legacyTimeout) => +if (legacyTimeout == 0) { + logWarning(s"Turning off blacklisting due to legacy configuaration:" + +s" $legacyKey == 0") + false +} else { + // mostly this is necessary just for tests, since real users that want the blacklist + // will get it anyway by default + logWarning(s"Turning on blacklisting due to legacy configuration:" + +s" $legacyKey > 0") + true +} + case None => +// local-cluster is *not* considered local for these purposes, we still want the +// blacklist enabled by default +!Utils.isLocalMaster(conf) +} +} + } + + def getBlacklistTimeout(conf: SparkConf): Long = { +conf.get(config.BLACKLIST_TIMEOUT_CONF).getOrElse { + conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF).getOrElse { +Utils.timeStringAsMs(DEFAULT_TIMEOUT) + } +} + } + + /** + * Verify that blacklist configurations are consistent; if not, throw an exception. Should only + * be called if blacklisting is enabled. + * + * The configuration for the blacklist is expected to adhere to a few invariants. Default + * values follow these rules of course, but users may unwittingly change one configuration + * without making the corresponding adjustment elsewhere. This ensures we fail-fast when + * there are such misconfigurations. + */ + def validateBlacklistConfs(conf: SparkConf): Unit = { + +def mustBePos(k: String, v: String): Unit = { + throw new IllegalArgumentException(s"$k was $v, but must be > 0.") +} + +// undocumented escape hatch for validation -- just for tests that want to run in an "unsafe" +// configuration. +if (!conf.get("spark.blacklist.testing.skipValidation", "false").toBoolean) { + + Seq( +config.MAX_TASK_ATTEMPTS_PER_EXECUTOR, +config.MAX_TASK_ATTEMPTS_PER_NODE, +config.MAX_FAILURES_PER_EXEC_STAGE, +config.MAX_FAILED_EXEC_PER_NODE_STAGE + ).foreach { config => +val v = conf.get(config) +if (v <= 0) { + mustBeP
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81861857 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler + +import scala.collection.mutable.HashMap + +/** + * Small helper for tracking failed tasks for blacklisting purposes. Info on all failures for one + * task set, within one task set. + */ +class ExecutorFailuresInTaskSet(val node: String) { --- End diff -- I'd like to push back on this and keep this naming. I think I actually came up with this name from an earlier comment of yours about being confused about some variables and asking for more particular names :) (maybe it was for other variables). the naming could just be on instance variables and not on the class itself of course, but I think its helpful in this case to have it on the class. I found even i got easily confused about the purpose of various little helper classes when I had to come back to this patch after a couple of days. Where its used generally told me more about the important design tradeoffs -- eg., if its in a taskset, it'll get dropped when the taskset completes, which automatically takes care of memory leaks. Maybe at some point in the future it'll be used for something else, but it seems like then that change go do the appropriate renaming. I feel like the name would also make it clear to the later change to check existing assumptions about the scope of this helper. It doesn't work as a nested class in the larger blacklist patch as I currently have it, so since you don't feel strongly I'll just leave at the top level (but I will make it `private[scheduler]`, which it should have been already). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81864312 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -98,6 +84,14 @@ private[spark] class TaskSetManager( var totalResultSize = 0L var calculatedTasks = 0 + val taskSetBlacklistOpt: Option[TaskSetBlacklist] = { --- End diff -- yeah, the "taskSet" is there because this code needs both in the later PR. The taskSetManager needs to communicate some info back to the app-level blacklist so it needs to work with both. I could ignore it here and we can deal w/ that naming issue when we get there if you want. I go back and forth on liking the "Opt" suffix ... I'm happy to do either way, but I often like opt so that the naming & meaning is clear for code like ```scala fooOpt.foreach { foo => ... } ``` eg. https://github.com/squito/spark/blob/taskset_blacklist_only/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L594 the naming pattern is used elsewhere in spark: ``` > find . -name "*.scala" | xargs grep "val .*Opt =" ./core/src/main/scala/org/apache/spark/MapOutputTracker.scala:val arrayOpt = mapStatuses.get(shuffleId) ./core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala: private val underlyingMethodOpt = { ./core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala: val jobOpt = statusToJobs.flatMap(_._2).find { jobInfo => jobInfo.jobId == jobId} ./core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala: val completionTimeOpt = jobUIData.completionTime ./core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala:val metricsOpt = taskUIData.metrics ./examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala: val numIterOpt = options.remove("numIter").map(_.toInt) ./external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala: val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads, ./graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala: val activeDirectionOpt = activeSetOpt.map(_._2) ./sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: val intOpt = ctx.freshName("intOpt") ./sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: val longOpt = ctx.freshName("longOpt") ./sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: val newConditionOpt = conditionOpt match { ./streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala: private val serializableConfOpt = conf.map(new SerializableConfiguration(_)) ./yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala: val hostOpt = allocatedContainerToHostMap.get(containerId) ``` but all that said, I don't feel strongly, just wanted to explain what I was thinking, if you still want a change thats totally fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81864626 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -478,16 +473,18 @@ private[spark] class TaskSetManager( // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" - logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," + -s" $taskLocality, ${serializedTask.limit} bytes)") + logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + +s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)") sched.dagScheduler.taskStarted(task, info) - return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, + Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, --- End diff -- oh of course, thanks :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81865871 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -766,9 +782,11 @@ private[spark] class TaskSetManager( logWarning(failureReason) None } -// always add to failed executors -failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). - put(info.executorId, clock.getTimeMillis()) + +if (reason.countTowardsTaskFailures) { --- End diff -- `failedExecutors` was the variable for the old blacklisting mechanism, its completely gone now. had nothing to do with executors that have failed completely, despite the name: ```scala // key is taskId (aka TaskInfo.index), value is a Map of executor id to when it failed private val failedExecutors = new HashMap[Int, HashMap[String, Long]]() ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81867147 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -766,9 +782,11 @@ private[spark] class TaskSetManager( logWarning(failureReason) None } -// always add to failed executors -failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). - put(info.executorId, clock.getTimeMillis()) + +if (reason.countTowardsTaskFailures) { --- End diff -- the question about moving inside the if on 801 is a good one. I guess I left the logic here to be more like it was before. Honestly I find `isZombie` to be super-confusing, I always have to think hard about exactly what it means. I guess your point is, if the taskset is a zombie, we're not going to schedule anymore tasks from it, so no point in tracking more failures? And furthermore, you can't "un-zombie" a taskset? The only case I can think of the logic actually being different is if you have speculative execution, and you get task failures after the taskset completes (not from taskcommitdenied, but real task failures). It doesn't matter at all for this PR, but it might matter in the context of app-level blacklisting, if you want to learn about bad executors. eg., maybe an executor first has stragglers, and then eventually fails things. The current code I have for app-level blacklisting wouldn't take advantage of this anyway, since it just gets blacklisting info from a taskset when it succeeds and thats it. anyway, for now I will go ahead and put it inside that block, but just wanted to share my thoughts since its a little subtle and wanted to talk it through. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81867460 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala --- @@ -51,37 +54,67 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM assertDataStructuresEmpty(noFailure = false) } - // even with the blacklist turned on, if maxTaskFailures is not more than the number - // of executors on the bad node, then locality preferences will lead to us cycling through - // the executors on the bad node, and still failing the job + // even with the blacklist turned on, bad configs can lead to job failure. To survive one + // bad node, you need to make sure that + // maxTaskFailures > min(spark.blacklist.task.maxTaskAttemptsPerNode, nExecutorsPerHost) testScheduler( "With blacklist on, job will still fail if there are too many bad executors on bad host", extraConfs = Seq( - // set this to something much longer than the test duration so that executors don't get - // removed from the blacklist during the test - ("spark.scheduler.executorTaskBlacklistTime", "1000") + config.BLACKLIST_ENABLED.key -> "true", + config.MAX_TASK_ATTEMPTS_PER_NODE.key -> "5", + config.MAX_TASK_FAILURES.key -> "4", + "spark.testing.nHosts" -> "2", + "spark.testing.nExecutorsPerHost" -> "5", + "spark.testing.nCoresPerExecutor" -> "10", + // Blacklisting will normally immediately complain that this config is invalid -- the point + // of this test is to expose that the configuration is unsafe, so skip the validation. + "spark.blacklist.testing.skipValidation" -> "true" --- End diff -- there are separate checks for the validation (`BlacklistTrackerSuite` "check blacklist configuration invariants") so I'll just delete this test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81868412 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config + +class BlacklistTrackerSuite extends SparkFunSuite { + + test("blacklist still respects legacy configs") { +val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key + +{ + val localConf = new SparkConf().setMaster("local") + assert(!BlacklistTracker.isBlacklistEnabled(localConf)) + localConf.set(legacyKey, "5000") + assert(BlacklistTracker.isBlacklistEnabled(localConf)) + assert(5000 === BlacklistTracker.getBlacklistTimeout(localConf)) + + localConf.set(legacyKey, "0") + assert(!BlacklistTracker.isBlacklistEnabled(localConf)) +} + +{ --- End diff -- good point, but I'll also turn blacklisting off by default so this paritcular test will just reorganized a little. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81893799 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -809,32 +816,65 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("Kill other task attempts when one attempt belonging to the same task succeeds") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) -val taskSet = FakeTask.createTaskSet(4) +val taskSet = FakeTask.createTaskSet(5) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately sc.conf.set("spark.speculation.multiplier", "0.0") +sc.conf.set("spark.speculation.quantile", "0.6") val clock = new ManualClock() val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => task.metrics.internalAccums } // Offer resources for 4 tasks to start +val tasks = new ArrayBuffer[TaskDescription]() for ((k, v) <- List( "exec1" -> "host1", "exec1" -> "host1", +"exec1" -> "host1", "exec2" -> "host2", "exec2" -> "host2")) { val taskOption = manager.resourceOffer(k, v, NO_PREF) assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === k) + tasks += task } -assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) -// Complete the 3 tasks and leave 1 task in running +assert(sched.startedTasks.toSet === (0 until 5).toSet) +// Complete 3 tasks and leave 2 task in running for (id <- Set(0, 1, 2)) { manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) assert(sched.endedTasks(id) === Success) } +def runningTaskForIndex(index: Int): TaskDescription = { + val t = tasks.find { task => task.index == index && !sched.endedTasks.contains(task.taskId) } + t match { +case Some(x) => x +case None => + throw new RuntimeException(s"couldn't find index $index in " + +s"tasks: ${tasks.map{t => t.index -> t.taskId}} with endedTasks:" + +s" ${sched.endedTasks.keys}") + } +} + +// have each of the running tasks fail 3 times (not enough to abort the stage) +(3 until 6).foreach { attempt => --- End diff -- it didn't *need* to, in the sense that the test would pass without these changes. I was concerned about my concern to `TaskKilled.countTowardsFailure` -- in particular, I wanted to make sure that when you have speculative tasks, cancelling speculative tasks doesn't lead to aborting the taskset. This wasn't tested at all before, so I expanded this test to cover that. So the additions here are to make sure there are 4 task "failures", but one of them is just a speculative task being canceled. The other changes to create 2 speculative tasks are also just for better coverage -- I wanted the first successful speculative task to *not* complete the taskset. Unfortunately that made some of the bookkeeping more complicated. In hindsight, its probably pointless to throw it all into the same test. I'll break it out into another one. It'll need to do virtually everything of the current test, so it will be redundant from a coverage perspective. But hopefully the intent of the tests will be more clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81895140 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler + +import scala.collection.mutable.{HashMap, HashSet} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config +import org.apache.spark.internal.Logging +import org.apache.spark.util.Clock + +/** + * Handles blacklisting executors and nodes within a taskset. This includes blacklisting specific + * (task, executor) / (task, nodes) pairs, and also completely blacklisting executors and nodes + * for the entire taskset. + * + * THREADING: As a helper to [[TaskSetManager]], this class is designed to only be called from code + * with a lock on the TaskScheduler (e.g. its event handlers). It should not be called from other + * threads. + */ +private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, val clock: Clock) +extends Logging { + + private val MAX_TASK_ATTEMPTS_PER_EXECUTOR = conf.get(config.MAX_TASK_ATTEMPTS_PER_EXECUTOR) + private val MAX_TASK_ATTEMPTS_PER_NODE = conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE) + private val MAX_FAILURES_PER_EXEC_STAGE = conf.get(config.MAX_FAILURES_PER_EXEC_STAGE) + private val MAX_FAILED_EXEC_PER_NODE_STAGE = conf.get(config.MAX_FAILED_EXEC_PER_NODE_STAGE) + private val TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + + /** + * A map from each executor to the task failures on that executor. + */ + val execToFailures: HashMap[String, ExecutorFailuresInTaskSet] = new HashMap() + + /** + * Map from node to all executors on it with failures. Needed because we want to know about + * executors on a node even after they have died. + */ + private val nodeToExecsWithFailures: HashMap[String, HashSet[String]] = new HashMap() + private val nodeToBlacklistedTasks: HashMap[String, HashSet[Int]] = new HashMap() + private val blacklistedExecs: HashSet[String] = new HashSet() + private val blacklistedNodes: HashSet[String] = new HashSet() + + /** + * Return true if this executor is blacklisted for the given task. This does *not* + * need to return true if the executor is blacklisted for the entire stage. + * That is to keep this method as fast as possible in the inner-loop of the + * scheduler, where those filters will have already been applied. + */ + def isExecutorBlacklistedForTask( + executorId: String, + index: Int): Boolean = { +execToFailures.get(executorId) + .map { execFailures => +val count = execFailures.taskToFailureCountAndExpiryTime.get(index).map(_._1).getOrElse(0) +count >= MAX_TASK_ATTEMPTS_PER_EXECUTOR + } + .getOrElse(false) + } + + def isNodeBlacklistedForTask( + node: String, + index: Int): Boolean = { +nodeToBlacklistedTasks.get(node) + .map(_.contains(index)) + .getOrElse(false) + } + + /** + * Return true if this executor is blacklisted for the given stage. Completely ignores whether + * anything to do with the node the executor is on. That + * is to keep this method as fast as possible in the inner-loop of the scheduler, where those + * filters will already have been applied. + */ + def isExecutorBlacklistedForTaskSet(executorId: String): Boolean = { +blacklistedExecs.contains(executorId) + } + + def isNodeBlacklistedForTaskSet(node: String): Boolean = { +blacklistedNodes.contains(node) + } --- End diff -- I know its verbose but I'd prefer to keep it. Especially once application-level blacklisting is added (https://github.com/apache/spark/pull/1407
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81896656 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[scheduler] object BlacklistTracker extends Logging { + + private val DEFAULT_TIMEOUT = "1h" + + /** + * Returns true if the blacklist is enabled, based on checking the configuration in the following + * order: + * 1. Is it specifically enabled or disabled? + * 2. Is it enabled via the legacy timeout conf? + * 3. Use the default for the spark-master: + * - off for local mode + * - on for distributed modes (including local-cluster) + */ + def isBlacklistEnabled(conf: SparkConf): Boolean = { +conf.get(config.BLACKLIST_ENABLED) match { + case Some(isEnabled) => +isEnabled + case None => +// if they've got a non-zero setting for the legacy conf, always enable the blacklist, +// otherwise, use the default based on the cluster-mode (off for local-mode, on otherwise). +val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key +conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF) match { + case Some(legacyTimeout) => +if (legacyTimeout == 0) { + logWarning(s"Turning off blacklisting due to legacy configuaration:" + +s" $legacyKey == 0") + false +} else { + // mostly this is necessary just for tests, since real users that want the blacklist + // will get it anyway by default + logWarning(s"Turning on blacklisting due to legacy configuration:" + +s" $legacyKey > 0") + true +} + case None => +// local-cluster is *not* considered local for these purposes, we still want the +// blacklist enabled by default +!Utils.isLocalMaster(conf) +} +} + } + + def getBlacklistTimeout(conf: SparkConf): Long = { +conf.get(config.BLACKLIST_TIMEOUT_CONF).getOrElse { + conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF).getOrElse { +Utils.timeStringAsMs(DEFAULT_TIMEOUT) + } +} + } + + /** + * Verify that blacklist configurations are consistent; if not, throw an exception. Should only + * be called if blacklisting is enabled. + * + * The configuration for the blacklist is expected to adhere to a few invariants. Default + * values follow these rules of course, but users may unwittingly change one configuration + * without making the corresponding adjustment elsewhere. This ensures we fail-fast when + * there are such misconfigurations. + */ + def validateBlacklistConfs(conf: SparkConf): Unit = { + +def mustBePos(k: String, v: String): Unit = { + throw new IllegalArgumentException(s"$k was $v, but must be > 0.") +} + +// undocumented escape hatch for validation -- just for tests that want to run in an "unsafe" +// configuration. +if (!conf.get("spark.blacklist.testing.skipValidation", "false").toBoolean) { + + Seq( +config.MAX_TASK_ATTEMPTS_PER_EXECUTOR, +config.MAX_TASK_ATTEMPTS_PER_NODE, +config.MAX_FAILURES_PER_EXEC_STAGE, +config.MAX_FAILED_EXEC_PER_NODE_STAGE + ).foreach { config => +val v = conf.get(config) +if (v <= 0) { + mustBeP
[GitHub] spark issue #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSets
Github user squito commented on the issue: https://github.com/apache/spark/pull/15249 @mridulm on the questions about expiry from blacklists, you are not missing anything -- this explictly does not do any timeouts at the taskset level (this is mentioned in the design doc). The timeout code you see is mostly just incremental stuff as a step towards https://github.com/apache/spark/pull/14079, but doesn't actually add any value here. The primary motivation for blacklisting that I've seen is actually quite different from the use case you are describing -- its not to help deal w/ resource contention, but to deal w/ truly broken resources (a bad disk in all the cases I can think of). In fact, in these cases, 1 hour is really short -- users really want something more like 6-12 hours probably. But 1 hr really isn't so bad, it just means that the bad resources need to be "rediscovered" that often, with a scheduling hiccup while that happens. This is really different from the use case you are describing -- its a form of back off to deal w/ resource contention. I have actually talked to a couple of different folks about doing something like this recently and think it would be great, though I see problems with this approach, since it allows other tasks to still be scheduled on those executors, and also the time isn't relative to the task runtime etc. Nonetheless, an issue here might be that the old option serves some purpose which is no longer supported. Do we need to add it back in? Just adding the logic for the timeouts again is pretty easy, though (a) I need to figure out the right place to do it so that it doesn't impact scheduling performance and more importantly (b) I really worry about being able to configure things so that blacklisting can actually handle totally broken resources. Eg., say that you set the timeout to 10s. If your tasks take 1 minute each, then your one bad executor might cycle through the leftover tasks, fail them all, pass the timeout, and repeat that cycle a few times till you go over spark.task.maxFailures. I don't see a good way to deal w/ while setting a sensible a timeout for the entire application. Two other workarounds: (2) just enable the timeout per-task when the legacy configuration is used. Leave it undocumented. We don't change behavior then, but configuration is kind of a mess, and it'll be a headache to continue to maintain this (3) Add a timeout just to *taskset* level blacklisting. So its a behavior change from the existing blacklisting, which has a timeout per *task*. This removes the interaction w/ spark.task.maxFailures that we've always got to tiptoe around. I also think it might satisfy your use case even better. I still don't think its a great solution to the problem, and we need something else for handling this sort of backoff better, so I don't feel great about it getting shoved into this feature. I'm thinking (3) is the best but will give it a bit more thought. Also @kayousterhout @tgravescs @markhamstra for opinions as well since this is a bigger design point to consider. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81898588 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[scheduler] object BlacklistTracker extends Logging { + + private val DEFAULT_TIMEOUT = "1h" --- End diff -- (longer top-level comment responding to this) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSets
Github user squito commented on the issue: https://github.com/apache/spark/pull/15249 @kayousterhout @mridulm thanks for the feedback. obviously still need to figure out the timeout thing but otherwise think I've addressed things. will do another pass in the morning. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81999277 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -592,34 +589,54 @@ private[spark] class TaskSetManager( * failures (this is because the method picks on unscheduled task, and then iterates through each * executor until it finds one that the task hasn't failed on already). */ - private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = { - -val pendingTask: Option[Int] = { - // usually this will just take the last pending task, but because of the lazy removal - // from each list, we may need to go deeper in the list. We poll from the end because - // failed tasks are put back at the end of allPendingTasks, so we're more likely to find - // an unschedulable task this way. - val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet => -copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) - } - if (indexOffset == -1) { -None - } else { -Some(allPendingTasks(indexOffset)) - } -} + private[scheduler] def abortIfCompletelyBlacklisted( --- End diff -- wait, why isn't the "most common scenario" still applicable? If you have two executors, and 4 max failures, then after the task fails twice, you won't be able to schedule anything. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r82007301 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -809,32 +816,65 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("Kill other task attempts when one attempt belonging to the same task succeeds") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) -val taskSet = FakeTask.createTaskSet(4) +val taskSet = FakeTask.createTaskSet(5) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately sc.conf.set("spark.speculation.multiplier", "0.0") +sc.conf.set("spark.speculation.quantile", "0.6") val clock = new ManualClock() val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => task.metrics.internalAccums } // Offer resources for 4 tasks to start +val tasks = new ArrayBuffer[TaskDescription]() for ((k, v) <- List( "exec1" -> "host1", "exec1" -> "host1", +"exec1" -> "host1", "exec2" -> "host2", "exec2" -> "host2")) { val taskOption = manager.resourceOffer(k, v, NO_PREF) assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === k) + tasks += task } -assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) -// Complete the 3 tasks and leave 1 task in running +assert(sched.startedTasks.toSet === (0 until 5).toSet) +// Complete 3 tasks and leave 2 task in running for (id <- Set(0, 1, 2)) { manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) assert(sched.endedTasks(id) === Success) } +def runningTaskForIndex(index: Int): TaskDescription = { + val t = tasks.find { task => task.index == index && !sched.endedTasks.contains(task.taskId) } + t match { +case Some(x) => x +case None => + throw new RuntimeException(s"couldn't find index $index in " + +s"tasks: ${tasks.map{t => t.index -> t.taskId}} with endedTasks:" + +s" ${sched.endedTasks.keys}") + } +} + +// have each of the running tasks fail 3 times (not enough to abort the stage) +(3 until 6).foreach { attempt => --- End diff -- after sleeping on this, I actually think its worth updating the original test. It used to only check that the task was killed when there was a lingering task for the *final* task. That's a rather special case -- it seems like its worth adding a check for the other tasks as well. That adds a most of the complexity here (with two speculative tasks, you need the extra book-keeping to figure out what your'e dealing with.) I could put it back in one test, and change the test name to include both aspects being tested, maybe "Kill other task attempts when one attempt belonging to the same task succeeds, and ensure killed tasks do not count towards failing task set". Or I could make two different tests with virtually the same setup (with shared code) and just different asserts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSets
Github user squito commented on the issue: https://github.com/apache/spark/pull/15249 I forgot to add that I had turned off blacklisting by default, I agree with your suggestion Kay. I pushed another commit which updated the docs as well. There are some other small style things and a couple of added comments etc. in there too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15335#discussion_r82021050 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1255,27 +1255,46 @@ class DAGScheduler( s"longer running") } - if (disallowStageRetryForTest) { -abortStage(failedStage, "Fetch failure will not retry stage due to testing config", - None) - } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { -abortStage(failedStage, s"$failedStage (${failedStage.name}) " + - s"has failed the maximum allowable number of " + - s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + - s"Most recent failure reason: ${failureMessage}", None) - } else { -if (failedStages.isEmpty) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. - // TODO: Cancel running tasks in the stage - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + -s"$failedStage (${failedStage.name}) due to fetch failure") - messageScheduler.schedule(new Runnable { -override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + val shouldAbortStage = +failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || +disallowStageRetryForTest + + if (shouldAbortStage) { +val abortMessage = if (disallowStageRetryForTest) { + "Fetch failure will not retry stage due to testing config" +} else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. + |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") } +abortStage(failedStage, abortMessage, None) + } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued +// TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 +val noResubmitEnqueued = !failedStages.contains(failedStage) failedStages += failedStage failedStages += mapStage +if (noResubmitEnqueued) { + // We expect one executor failure to trigger many FetchFailures in rapid succession, + // but all of those task failures can typically be handled by a single resubmission of + // the failed stage. We avoid flooding the scheduler's event queue with resubmit + // messages by checking whether a resubmit is already in the event queue for the + // failed stage. If there is already a resubmit enqueued for a different failed + // stage, that event would also be sufficient to handle the current failed stage, but + // producing a resubmit for each failed stage makes debugging and logging a little + // simpler while not producing an overwhelming number of scheduler events. + logInfo( +s"Resubmitting $mapStage (${mapStage.name}) and " + +s"$failedStage (${failedStage.name}) due to fetch failure" + ) + messageScheduler.schedule( --- End diff -- I find myself frequently wondering about the purpose of this. Its commented very tersely on RESUBMIT_TIMEOUT, but I think it might be nice to add a longer comment here. I guess something like "If we get one fetch-failure, we often get more fetch failures across multiple executors. We will get better parallelism when we resubmit the mapStage if we can resubmit when we know about as many of those failures as possible. So this is a heuristic to add a *small* delay to see if we gather a few more failures before we resubmit." We do *not* need the delay to figure out exactly which shuffle-map outputs are gone on the executor -- we always [mark the executor as lost on a fetch failure](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1288), which means we mark all its map output as gone. (This is really confusing -- it *looks* like we only [remove the one shuffle
[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/15335#discussion_r82010161 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1255,27 +1255,46 @@ class DAGScheduler( s"longer running") } - if (disallowStageRetryForTest) { -abortStage(failedStage, "Fetch failure will not retry stage due to testing config", - None) - } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { -abortStage(failedStage, s"$failedStage (${failedStage.name}) " + - s"has failed the maximum allowable number of " + - s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + - s"Most recent failure reason: ${failureMessage}", None) - } else { -if (failedStages.isEmpty) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. - // TODO: Cancel running tasks in the stage - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + -s"$failedStage (${failedStage.name}) due to fetch failure") - messageScheduler.schedule(new Runnable { -override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + val shouldAbortStage = +failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || +disallowStageRetryForTest + + if (shouldAbortStage) { +val abortMessage = if (disallowStageRetryForTest) { + "Fetch failure will not retry stage due to testing config" +} else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. + |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") } +abortStage(failedStage, abortMessage, None) + } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued +// TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 +val noResubmitEnqueued = !failedStages.contains(failedStage) --- End diff -- name is slightly misleading, since there might be a Resubmit enqueued for another stage. Maybe `noResubmitEnqueuedForThisStage`? but perhaps just longer without adding any clarity -- the comment below explains it pretty well so I'm ok w/ the name, just thinking aloud. Also, I was weondering whether this should be `!failedStages.contains(failedStage) || !failedStages.contains(mapStage)`. Is there any scenario where it would contains `failedStage` but not `mapStage`? I couldn't come up with anything, but also wonder if there are enough weird scenarios we easily overlook that it might be better to keep it in just in case. We could also try to send even fewer Resubmit events -- if the stage is already in `waitingStages`, we don't need to resubmit. But I think I'd prefer to not go that far, since its always safe to over-Resubmit, and worried we may overlook a case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org