[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/1297#discussion_r17790219
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/IndexedRDDLike.scala ---
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.immutable.LongMap
+import scala.language.higherKinds
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+
+import IndexedRDD.Id
+
+/**
+ * Contains members that are shared among all variants of IndexedRDD 
(e.g., IndexedRDD,
+ * VertexRDD).
+ *
+ * @tparam V the type of the values stored in the IndexedRDD
+ * @tparam P the type of the partitions making up the IndexedRDD
+ * @tparam Self the type of the implementing container. This allows 
transformation methods on any
+ * implementing container to yield a result of the same type.
+ */
+private[spark] trait IndexedRDDLike[
+@specialized(Long, Int, Double) V,
+P[X] <: IndexedRDDPartitionLike[X, P],
+Self[X] <: IndexedRDDLike[X, P, Self]]
+  extends RDD[(Id, V)] {
+
+  /** A generator for ClassTags of the value type V. */
+  protected implicit def vTag: ClassTag[V]
+
+  /** A generator for ClassTags of the partition type P. */
+  protected implicit def pTag[V2]: ClassTag[P[V2]]
+
+  /** Accessor for the IndexedRDD variant that is mixing in this trait. */
+  protected def self: Self[V]
+
+  /** The underlying representation of the IndexedRDD as an RDD of 
partitions. */
+  def partitionsRDD: RDD[P[V]]
+  require(partitionsRDD.partitioner.isDefined)
+
+  def withPartitionsRDD[V2: ClassTag](partitionsRDD: RDD[P[V2]]): Self[V2]
+
+  override val partitioner = partitionsRDD.partitioner
+
+  override protected def getPartitions: Array[Partition] = 
partitionsRDD.partitions
+
+  override protected def getPreferredLocations(s: Partition): Seq[String] =
+partitionsRDD.preferredLocations(s)
+
+  override def persist(newLevel: StorageLevel): this.type = {
+partitionsRDD.persist(newLevel)
+this
+  }
+
+  override def unpersist(blocking: Boolean = true): this.type = {
+partitionsRDD.unpersist(blocking)
+this
+  }
+
+  override def count(): Long = {
+partitionsRDD.map(_.size).reduce(_ + _)
+  }
+
+  /** Provides the `RDD[(Id, V)]` equivalent output. */
+  override def compute(part: Partition, context: TaskContext): 
Iterator[(Id, V)] = {
+firstParent[P[V]].iterator(part, context).next.iterator
+  }
+
+  /** Gets the value corresponding to the specified key, if any. */
+  def get(k: Id): Option[V] = multiget(Array(k)).get(k)
+
+  /** Gets the values corresponding to the specified keys, if any. */
+  def multiget(ks: Array[Id]): Map[Id, V] = {
+val ksByPartition = ks.groupBy(k => 
self.partitioner.get.getPartition(k))
+val partitions = ksByPartition.keys.toSeq
+def unionMaps(maps: TraversableOnce[LongMap[V]]): LongMap[V] = {
+  maps.foldLeft(LongMap.empty[V]) {
+(accum, map) => accum.unionWith(map, (id, a, b) => a)
+  }
+}
+// TODO: avoid sending all keys to all partitions by creating and 
zipping an RDD of keys
--- End diff --

would this be another use of the `bulkMultiget` I suggested in jira?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/1297#discussion_r17791303
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ImmutableLongOpenHashSet.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect._
+import com.google.common.hash.Hashing
+
+/**
+ * A fast, immutable hash set optimized for insertions and lookups (but 
not deletions) of `Long`
+ * elements. Because it exposes the position of a key in the underlying 
array, this is useful as a
+ * building block for higher level data structures such as a hash map (for 
example,
+ * IndexedRDDPartition).
+ *
+ * It uses quadratic probing with a power-of-2 hash table size, which is 
guaranteed to explore all
+ * spaces for each key (see 
http://en.wikipedia.org/wiki/Quadratic_probing).
+ */
+private[spark] class ImmutableLongOpenHashSet(
+/** Underlying array of elements used as a hash table. */
+val data: ImmutableVector[Long],
+/** Whether or not there is an element at the corresponding position 
in `data`. */
+val bitset: ImmutableBitSet,
+/**
+ * Position of a focused element. This is useful when returning a 
modified set along with a
+ * pointer to the location of modification.
+ */
+val focus: Int,
+/** Load threshold at which to grow the underlying vectors. */
+loadFactor: Double
+  ) extends Serializable {
+
+  require(loadFactor < 1.0, "Load factor must be less than 1.0")
+  require(loadFactor > 0.0, "Load factor must be greater than 0.0")
+  require(capacity == nextPowerOf2(capacity), "data capacity must be a 
power of 2")
+
+  import OpenHashSet.{INVALID_POS, NONEXISTENCE_MASK, POSITION_MASK, 
Hasher, LongHasher}
+
+  private val hasher: Hasher[Long] = new LongHasher
+
+  private def mask = capacity - 1
+  private def growThreshold = (loadFactor * capacity).toInt
+
+  def withFocus(focus: Int): ImmutableLongOpenHashSet =
+new ImmutableLongOpenHashSet(data, bitset, focus, loadFactor)
+
+  /** The number of elements in the set. */
+  def size: Int = bitset.cardinality
+
+  /** The capacity of the set (i.e. size of the underlying vector). */
+  def capacity: Int = data.size
+
+  /** Return true if this set contains the specified element. */
+  def contains(k: Long): Boolean = getPos(k) != INVALID_POS
+
+  /**
+   * Nondestructively add an element to the set, returning a new set. If 
the set is over capacity
+   * after the insertion, grows the set and rehashes all elements.
+   */
+  def add(k: Long): ImmutableLongOpenHashSet = {
+addWithoutResize(k).rehashIfNeeded(ImmutableLongOpenHashSet.grow, 
ImmutableLongOpenHashSet.move)
+  }
+
+  /**
+   * Add an element to the set. This one differs from add in that it 
doesn't trigger rehashing.
+   * The caller is responsible for calling rehashIfNeeded.
+   *
+   * Use (retval.focus & POSITION_MASK) to get the actual position, and
+   * (retval.focus & NONEXISTENCE_MASK) == 0 for prior existence.
+   */
+  def addWithoutResize(k: Long): ImmutableLongOpenHashSet = {
+var pos = hashcode(hasher.hash(k)) & mask
+var i = 1
+var result: ImmutableLongOpenHashSet = null
+while (result == null) {
+  if (!bitset.get(pos)) {
+// This is a new key.
+result = new ImmutableLongOpenHashSet(
+  data.updated(pos, k), bitset.set(pos), pos | NONEXISTENCE_MASK, 
loadFactor)
+  } else if (data(pos) == k) {
+// Found an existing key.
+result = this.withFocus(pos)
+  } else {
+val delta = i
+pos = (pos + delta) & mask
+i += 1
+  }
+}
+result
+  }
+
+  /**
+  

[GitHub] spark pull request: [SPARK-2365] Add IndexedRDD, an efficient upda...

2014-09-19 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/1297#issuecomment-56199798
  
This looks great!  my comments are minor.

I know its early to be discussing example docs, but I just wanted to 
mention that I can see caching being an area of confusion.  Eg., you wouldn't 
want to serialize & cache each update to an indexedRDD, as each cache would 
make a full copy and not get the benefits of the ImmutableVectors. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16650: [SPARK-16554][CORE] Automatically Kill Executors and Nod...

2017-02-09 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16650
  
merged to master, thanks @jsoltren 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16862: [SPARK-19520][streaming] Do not encrypt data writ...

2017-02-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16862#discussion_r100414780
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -758,9 +761,33 @@ private[spark] class BlockManager(
   blockId: BlockId,
   bytes: ChunkedByteBuffer,
   level: StorageLevel,
-  tellMaster: Boolean = true): Boolean = {
+  tellMaster: Boolean = true,
+  encrypt: Boolean = false): Boolean = {
--- End diff --

I think its worth documenting this param.  At first I was going to suggest 
that it should be called `allowEncryption` like the other one, but I realize 
its more complicated than that.  Maybe something like

If true, the given bytes should be encrypted before they are stored.  Note 
that in most cases, the given bytes will *already* be encrypted if encryption 
is on.  An important exception to this is with the streaming WAL.  Since the 
WAL does not support encryption, those bytes are generated un-encrypted.  But 
we still encrypt those bytes before storing in the block manager.

Maybe too wordy but I think its worth documenting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16876: [SPARK-19537] Move pendingPartitions to ShuffleMa...

2017-02-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16876#discussion_r100468844
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1038,9 +1038,8 @@ class DAGScheduler(
 }
 
 if (tasks.size > 0) {
-  logInfo("Submitting " + tasks.size + " missing tasks from " + stage 
+ " (" + stage.rdd + ")")
-  stage.pendingPartitions ++= tasks.map(_.partitionId)
-  logDebug("New pending partitions: " + stage.pendingPartitions)
+  logInfo(s"Submitting ${tasks.size} missing tasks (for partitions " +
+s"${tasks.map(_.partitionId)}) from $stage (${stage.rdd})")
--- End diff --

I think including all the partitions could make this log line a little too 
long for an info message (what if there are 1k or 10k tasks).  I understand not 
wanting to stick it in a debug message (you often dont' realize you want debug 
logging on until *after* something has failed), but I think this will be too 
much for regular runs.

Would it still be useful if you truncate to the first 10 tasks?  
`tasks.take(10).map(...)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16813: [SPARK-19466][CORE][SCHEDULER] Improve Fair Scheduler Lo...

2017-02-09 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16813
  
failure is probably unrelated

Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16813: [SPARK-19466][CORE][SCHEDULER] Improve Fair Scheduler Lo...

2017-02-09 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16813
  
@shaneknapp the build failed to trigger for me again, I did it manually via 
spark-prs.appspot.com


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.

2017-02-09 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16876
  
lgtm when tests pass


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16901#discussion_r101059074
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,63 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("After fetching failed, success of old attempt of stage should be 
taken as valid.") {
--- End diff --

can you rename to "After a fetch failure, success ..."
really minor, but I had to read this twice


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16901#discussion_r101061681
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,63 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("After fetching failed, success of old attempt of stage should be 
taken as valid.") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA))
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB))
+
+submit(rddC, Array(0, 1))
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+
+// Complete both tasks in rddA.
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostA", 2
+
+// Fetch failed on hostA for task(partitionId=0) and 
task(partitionId=1) is still running.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
+"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
+  result = null))
+
+// Both original tasks in rddA should be marked as failed, because 
they ran on the
+// failed hostA, so both should be resubmitted. Complete them 
successfully.
+scheduler.resubmitFailedStages()
+assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1
+  && taskSets(2).tasks.size === 2)
+complete(taskSets(2), Seq(
+  (Success, makeMapStatus("hostB", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// Both tasks in rddB should be resubmitted, because none of them has 
succeeded.
+// Complete the task(partitionId=0) successfully. Task(partitionId=1) 
is still running.
+assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 &&
+  taskSets(3).tasks.size === 2)
+runEvent(makeCompletionEvent(
+  taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// Complete the task(partition=1) which is from the old 
attempt(stageId=1, stageAttempt=0)
+// successfully.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+
+// Thanks to the success from old attempt of stage(stageId=1, 
stageAttempt=0), there's no
+// pending partitions for stage(stageId=1) now, thus downstream stage 
should be submitted,
+// though there's still a running task(stageId=1, stageAttempt=1, 
partitionId=1)
+// in the active stage attempt.
+assert(taskSets.size === 5 && 
taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]])
+complete(taskSets(4), Seq(
--- End diff --

I was going to suggest adding a check here to make sure that all prior 
tasksetmanagers are marked as zombies.  But (a) you can't check that, since the 
dagscheduler doesn't have a handle on the tasksetmanagers, and (b) more 
importantly, the prior TSMs actually are *not* marked as zombies.  So they may 
continue to submit tasks even though they're not necessary.

I will file a separate bug for that -- its a performance issue, not a 
correctness issue, so not critical.  But this isn't the same as the old "kill 
running tasks when marking a tsm as a zombie" -- in this case, the issue is 
that the tsm may continue to launch *new* tasks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16901#discussion_r101064021
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,63 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("After fetching failed, success of old attempt of stage should be 
taken as valid.") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA))
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB))
+
+submit(rddC, Array(0, 1))
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+
+// Complete both tasks in rddA.
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostA", 2
+
+// Fetch failed on hostA for task(partitionId=0) and 
task(partitionId=1) is still running.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
+"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
+  result = null))
+
+// Both original tasks in rddA should be marked as failed, because 
they ran on the
+// failed hostA, so both should be resubmitted. Complete them 
successfully.
+scheduler.resubmitFailedStages()
+assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1
+  && taskSets(2).tasks.size === 2)
+complete(taskSets(2), Seq(
+  (Success, makeMapStatus("hostB", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// Both tasks in rddB should be resubmitted, because none of them has 
succeeded.
+// Complete the task(partitionId=0) successfully. Task(partitionId=1) 
is still running.
+assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 &&
+  taskSets(3).tasks.size === 2)
+runEvent(makeCompletionEvent(
+  taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// Complete the task(partition=1) which is from the old 
attempt(stageId=1, stageAttempt=0)
+// successfully.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+
+// Thanks to the success from old attempt of stage(stageId=1, 
stageAttempt=0), there's no
+// pending partitions for stage(stageId=1) now, thus downstream stage 
should be submitted,
+// though there's still a running task(stageId=1, stageAttempt=1, 
partitionId=1)
+// in the active stage attempt.
+assert(taskSets.size === 5 && 
taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]])
+complete(taskSets(4), Seq(
--- End diff --

https://issues.apache.org/jira/browse/SPARK-19596


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

2017-02-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16620#discussion_r101070245
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1181,15 +1181,31 @@ class DAGScheduler(
 
   case smt: ShuffleMapTask =>
 val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
-shuffleStage.pendingPartitions -= task.partitionId
 updateAccumulators(event)
 val status = event.result.asInstanceOf[MapStatus]
 val execId = status.location.executorId
 logDebug("ShuffleMapTask finished on " + execId)
+if (stageIdToStage(task.stageId).latestInfo.attemptId == 
task.stageAttemptId) {
+  // This task was for the currently running attempt of the 
stage. Since the task
+  // completed successfully from the perspective of the 
TaskSetManager, mark it as
+  // no longer pending (the TaskSetManager may consider the 
task complete even
+  // when the output needs to be ignored because the task's 
epoch is too small below).
+  shuffleStage.pendingPartitions -= task.partitionId
--- End diff --

I think its worth also explaining how this inconsistency between 
pendingPartitions and outputLocations gets resolved.  IIUC, its that when the 
pendingPartitions is empty, the scheduler will check outputLocations, realize 
something is missing, and resubmit this stage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16930: [SPARK-19597][CORE] test case for task deserializ...

2017-02-14 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/16930

[SPARK-19597][CORE] test case  for task deserialization errors

Adds a task case that ensures that Executors gracefully handle a task that 
fails to deserialize, by sending back a reasonable failure message.  This does 
not change any behavior (the prior behavior was already correct), it just adds 
a test case to prevent regression.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark executor_task_deserialization

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16930.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16930






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16639: [SPARK-19276][CORE] Fetch Failure handling robust...

2017-02-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16639#discussion_r101090765
  
--- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
---
@@ -133,6 +123,153 @@ class ExecutorSuite extends SparkFunSuite {
   }
 }
   }
+
+  test("SPARK-19276: Handle Fetch Failed for all intervening user code") {
+val conf = new SparkConf().setMaster("local").setAppName("executor 
suite test")
+sc = new SparkContext(conf)
+
+val serializer = SparkEnv.get.closureSerializer.newInstance()
+val resultFunc = (context: TaskContext, itr: Iterator[Int]) => itr.size
+val inputRDD = new FakeShuffleRDD(sc)
+val secondRDD = new FetchFailureHidingRDD(sc, inputRDD)
+val taskBinary = sc.broadcast(serializer.serialize((secondRDD, 
resultFunc)).array())
+val serializedTaskMetrics = 
serializer.serialize(TaskMetrics.registered).array()
+val task = new ResultTask(
+  stageId = 1,
+  stageAttemptId = 0,
+  taskBinary = taskBinary,
+  partition = secondRDD.partitions(0),
+  locs = Seq(),
+  outputId = 0,
+  localProperties = new Properties(),
+  serializedTaskMetrics = serializedTaskMetrics
+)
+
+val serTask = serializer.serialize(task)
+val taskDescription = fakeTaskDescription(serTask)
+
+
+val failReason = runTaskAndGetFailReason(taskDescription)
+assert(failReason.isInstanceOf[FetchFailed])
+  }
+
+  test("Gracefully handle error in task deserialization") {
--- End diff --

@mridulm pointed out this bug in an earlier version of this pr, so I fixed 
the bug and added a test case.  But in any case, I've separated this out into 
https://github.com/apache/spark/pull/16930 / 
https://issues.apache.org/jira/browse/SPARK-19597


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16639: [SPARK-19276][CORE] Fetch Failure handling robust...

2017-02-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16639#discussion_r101091688
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -400,8 +410,16 @@ private[spark] class Executor(
 execBackend.statusUpdate(taskId, TaskState.FINISHED, 
serializedResult)
 
   } catch {
-case ffe: FetchFailedException =>
-  val reason = ffe.toTaskFailedReason
+case t: Throwable if hasFetchFailure =>
+  val reason = task.context.fetchFailed.get.toTaskFailedReason
+  if (!t.isInstanceOf[FetchFailedException]) {
+// there was a fetch failure in the task, but some user code 
wrapped that exception
+// and threw something else.  Regardless, we treat it as a 
fetch failure.
+logWarning(s"TID ${taskId} encountered a 
${classOf[FetchFailedException]} and " +
+  s"failed, but did not directly throw the 
${classOf[FetchFailedException]}.  " +
+  s"Spark is still handling the fetch failure, but these 
exceptions should not be " +
+  s"intercepted by user code.")
--- End diff --

@mridulm @kayousterhout how is this msg?  open to other suggestions.  I'm 
not sure exactly what to recommend to the user instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16930: [SPARK-19597][CORE] test case for task deserialization e...

2017-02-14 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16930
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16639: [SPARK-19276][CORE] Fetch Failure handling robust to use...

2017-02-14 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16639
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16978: [SPARK-19652][UI] Do auth checks for REST API access.

2017-02-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16978
  
code changes look fine, but is it possible to add a test case to 
HistoryServerSuite?  I worry nobody is going to think about security / know how 
to test it so it may easily get broken.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16901
  
sorry responding late to this, but your analysis sounds fine


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16892: [SPARK-19560] Improve DAGScheduler tests.

2017-02-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16892#discussion_r102281985
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -1569,24 +1569,44 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 assertDataStructuresEmpty()
   }
 
-  test("run trivial shuffle with out-of-band failure and retry") {
+  /**
+   * In this test, we run a map stage where one of the executors fails but 
we still receive a
+   * "zombie" complete message from a task that ran on that executor. We 
want to make sure the
+   * stage is resubmitted so that the task that ran on the failed executor 
is re-executed, and
+   * that the stage is only marked as finished once that task completes.
+   */
+  test("run trivial shuffle with out-of-band executor failure and retry") {
 val shuffleMapRdd = new MyRDD(sc, 2, Nil)
 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
 val shuffleId = shuffleDep.shuffleId
 val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = 
mapOutputTracker)
 submit(reduceRdd, Array(0))
-// blockManagerMaster.removeExecutor("exec-hostA")
-// pretend we were told hostA went away
+// Tell the DAGScheduler that hostA was lost.
 runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
-// DAGScheduler will immediately resubmit the stage after it appears 
to have no pending tasks
-// rather than marking it is as failed and waiting.
 complete(taskSets(0), Seq(
   (Success, makeMapStatus("hostA", 1)),
   (Success, makeMapStatus("hostB", 1
+
+// At this point, no more tasks are running for the stage (and the 
TaskSetManager considers the
+// stage complete), but the tasks that ran on HostA need to be re-run, 
so the DAGScheduler
+// should re-submit the stage.
+assert(taskSets.size === 2)
+
+// Make sure that the stage that was re-submitted was the 
ShuffleMapStage (not the reduce
+// stage, which shouldn't be run until all of the tasks in the 
ShuffleMapStage complete on
+// alive executors).
+assert(taskSets(1).tasks(0).isInstanceOf[ShuffleMapTask])
--- End diff --

do you think its worth adding

```scala
assert(taskSets(1).tasks.size === 1)
```
here, to make sure that only the one task is resubmitted, not both?  If it 
weren't true, the test would fail later on anyway, but it might be helpful to 
get a more meaningful earlier error msg.  Not necessary, up to you on whether 
its worth adding.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16892: [SPARK-19560] Improve DAGScheduler tests.

2017-02-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16892#discussion_r102282650
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2031,6 +2051,11 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
* In this test, we run a map stage where one of the executors fails but 
we still receive a
* "zombie" complete message from that executor. We want to make sure 
the stage is not reported
* as done until all tasks have completed.
+   *
+   * Most of the functionality in this test is tested in "run trivial 
shuffle with out-of-band
+   * executor failure and retry".  However, that test uses 
ShuffleMapStages that are followed by
+   * a ResultStage, whereas in this test, the ShuffleMapStage is tested in 
isolation, without a
+   * ResultStage after it.
--- End diff --

I should have looked closer at this test earlier ... I was hoping this was 
testing a *multi*-stage mapjob.  That would really be a better test.  ideally 
we'd even have three stages, with a failure happening in the second stage, and 
the last stage.

In any case, your changes still look good, no need to have to do those 
other things now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...

2017-02-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16989
  
Hi @jinxing64 I posted a comment on jira about the design -- I think this 
is a big enough change that its worth discussing the design first.  Its fine to 
keep working on the code as a demonstration if you want, but for now I'd ask 
that you label this a work-in-progress "[WIP]".  (I personally have only 
briefly glanced at the code and am unlikely to look more closely till we sort 
out the design issues.)

fwiw I think this is will be a great feature, we just need to be thoughtful 
about it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...

2017-02-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16989
  
Jenkins, add to whitelist


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-02-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r102293021
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -911,14 +916,14 @@ private[spark] class TaskSetManager(
 logDebug("Checking for speculative tasks: minFinished = " + 
minFinishedForSpeculation)
 if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 
0) {
   val time = clock.getTimeMillis()
-  val durations = 
taskInfos.values.filter(_.successful).map(_.duration).toArray
-  Arrays.sort(durations)
+  val durations = successfulTasksSet.toArray.map(taskInfos(_).duration)
   val medianDuration = durations(min((0.5 * 
tasksSuccessful).round.toInt, durations.length - 1))
   val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 
minTimeToSpeculation)
   // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
   // bound based on that.
   logDebug("Task length threshold for speculation: " + threshold)
-  for ((tid, info) <- taskInfos) {
+  for (tid <- runningTasksSet) {
+val info = taskInfos(tid)
--- End diff --

oh, good find, this alone looks like a worthwhile fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-02-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r102292835
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -911,14 +916,14 @@ private[spark] class TaskSetManager(
 logDebug("Checking for speculative tasks: minFinished = " + 
minFinishedForSpeculation)
 if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 
0) {
   val time = clock.getTimeMillis()
-  val durations = 
taskInfos.values.filter(_.successful).map(_.duration).toArray
-  Arrays.sort(durations)
+  val durations = successfulTasksSet.toArray.map(taskInfos(_).duration)
--- End diff --

there are some other things you could do to make the original code more 
efficient:
a) instead of `.values.filter.map`, use a `foreach` to directly add into an 
`ArrayBuffer`.  That will avoid all the intermediate collections that scala 
would create otherwise.
b) store an approximate distribution of the runtimes, eg. using a tdigest.

aside: what  pain that there is no quick way to get the middle element of a 
TreeSet -- I couldn't find anything efficient in either the java or scala libs 
:(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-02-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16867
  
Jenkins, ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-02-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r102292628
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -137,6 +137,12 @@ private[spark] class TaskSetManager(
   // Task index, start and finish time for each task attempt (indexed by 
task ID)
   val taskInfos = new HashMap[Long, TaskInfo]
 
+  val successfulTasksSet = new scala.collection.mutable.TreeSet[Long] {
--- End diff --

this can't be a set, since multiple tasks might be running for the same 
amount of time.  you could change it to a set of `(time, count)` pairs, which 
would make the update logic a bit more complicated, or just keep a list of all 
runtimes.

also the name should indicate that its the durations, eg. 
`successfulTaskDurations`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16978: [SPARK-19652][UI] Do auth checks for REST API access.

2017-02-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16978
  
lgtm assuming tests pass


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16946: [SPARK-19554][UI,YARN] Allow SHS URL to be used f...

2017-02-22 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16946#discussion_r102546890
  
--- Diff: docs/running-on-yarn.md ---
@@ -604,3 +604,17 @@ spark.yarn.am.extraJavaOptions 
-Dsun.security.krb5.debug=true -Dsun.security.spn
 
 Finally, if the log level for `org.apache.spark.deploy.yarn.Client` is set 
to `DEBUG`, the log
 will include a list of all tokens obtained, and their expiry details
+
+## Using the Spark History Server to replace the Spark Web UI
+
+It is possible to use the Spark History Server application page as the 
tracking URL for running
+applications in scenarios where it may be desired to disable the built-in 
application UI. Two steps
--- End diff --

nit: first sentence reads a little funny.  maybe rephrase to:

It is possible to use the Spark History Server application page as the 
tracking URL for running
 applications where built-in application UI is disabled.  This may be 
desirable on secure clusters or to avoid the memory usage on the driver from 
the UI.

Up to you.  maybe doens't even need the second sentence.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16946: [SPARK-19554][UI,YARN] Allow SHS URL to be used f...

2017-02-22 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16946#discussion_r102539475
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnProxyRedirectFilterSuite.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.{PrintWriter, StringWriter}
+import javax.servlet.FilterChain
+import javax.servlet.http.{Cookie, HttpServletRequest, HttpServletResponse}
+
+import org.mockito.Mockito._
+
+import org.apache.spark.SparkFunSuite
+
+class YarnProxyRedirectFilterSuite extends SparkFunSuite {
+
+  test("redirect proxied requests, pass-through others") {
+val requestURL = "http://example.com:1234/foo?";
+val filter = new YarnProxyRedirectFilter()
+val cookies = Array(new Cookie(YarnProxyRedirectFilter.COOKIE_NAME, 
"dr.who"))
+
+val req = mock(classOf[HttpServletRequest])
+when(req.getCookies()).thenReturn(cookies, null)
--- End diff --

I was really confused by this test at first -- I didn't know that 
`thenReturn` lets you specify multiple values for consecutive calls.  For any 
one else as clueless as me, it would be helpful to drop in a comment here to 
draw attention to this, eg. "First request has cookies with a user name, second 
request does not".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16892: [SPARK-19560] Improve DAGScheduler tests.

2017-02-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16892#discussion_r102996593
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2031,6 +2051,11 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
* In this test, we run a map stage where one of the executors fails but 
we still receive a
* "zombie" complete message from that executor. We want to make sure 
the stage is not reported
* as done until all tasks have completed.
+   *
+   * Most of the functionality in this test is tested in "run trivial 
shuffle with out-of-band
+   * executor failure and retry".  However, that test uses 
ShuffleMapStages that are followed by
+   * a ResultStage, whereas in this test, the ShuffleMapStage is tested in 
isolation, without a
+   * ResultStage after it.
--- End diff --

the *final* map-stage can't have anything that follows it, but the job 
overall can still have multiple stages, and the failure can occur during the 
processing of those earlier map-stages, or the final one.

In any case, I agree you don't need to expand that test in this PR.  and 
even though this test doesn't do as much as I was hoping, I do still think it 
adds value, and is worth leaving in, even though its very similar to the other 
test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16930: [SPARK-19597][CORE] test case for task deserialization e...

2017-02-24 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16930
  
thanks @kayousterhout !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage policies tes...

2017-01-03 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/15604
  
Jenkins, ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage polic...

2017-01-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15604#discussion_r94497574
  
--- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala ---
@@ -178,4 +180,97 @@ class PoolSuite extends SparkFunSuite with 
LocalSparkContext {
 scheduleTaskAndVerifyId(2, rootPool, 6)
 scheduleTaskAndVerifyId(3, rootPool, 2)
   }
+
+  test("SPARK-18066: FIFO Scheduler just uses root pool") {
+sc = new SparkContext("local", "PoolSuite")
+val taskScheduler = new TaskSchedulerImpl(sc)
+
+val rootPool = new Pool("", SchedulingMode.FIFO, initMinShare = 0, 
initWeight = 0)
+val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
+
+val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, 
taskScheduler)
+val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, 
taskScheduler)
+
+val properties = new Properties()
+properties.setProperty("spark.scheduler.pool", TEST_POOL)
+
+// FIFOSchedulableBuilder just uses rootPool so even if properties are 
set, related pool
+// (testPool) is not created and TaskSetManagers are added to rootPool
+schedulableBuilder.addTaskSetManager(taskSetManager0, properties)
+schedulableBuilder.addTaskSetManager(taskSetManager1, properties)
+
+assert(rootPool.getSchedulableByName(TEST_POOL) == null)
+assert(rootPool.schedulableQueue.size == 2)
+assert(rootPool.getSchedulableByName(taskSetManager0.name) === 
taskSetManager0)
+assert(rootPool.getSchedulableByName(taskSetManager1.name) === 
taskSetManager1)
+  }
+
+  test("SPARK-18066: FAIR Scheduler uses default pool when 
spark.scheduler.pool property is not " +
+"set") {
+sc = new SparkContext("local", "PoolSuite")
+val taskScheduler = new TaskSchedulerImpl(sc)
+
+val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, 
initWeight = 0)
+val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+schedulableBuilder.buildPools()
+
+// FAIR Scheduler uses default pool when pool properties are null
+val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, 
taskScheduler)
+
+schedulableBuilder.addTaskSetManager(taskSetManager0, null)
+
+val defaultPool = 
rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME)
+assert(defaultPool != null)
+assert(defaultPool.schedulableQueue.size == 1)
+assert(defaultPool.getSchedulableByName(taskSetManager0.name) === 
taskSetManager0)
+
+// FAIR Scheduler uses default pool when spark.scheduler.pool property 
is not set
+val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, 
taskScheduler)
+
+schedulableBuilder.addTaskSetManager(taskSetManager1, new Properties())
+
+assert(defaultPool.schedulableQueue.size == 2)
+assert(defaultPool.getSchedulableByName(taskSetManager1.name) === 
taskSetManager1)
+
+// FAIR Scheduler uses default pool when spark.scheduler.pool property 
is set as default pool
--- End diff --

I'm all for more tests, but this final case here seems pretty unnecessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage polic...

2017-01-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15604#discussion_r94499675
  
--- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala ---
@@ -178,4 +180,97 @@ class PoolSuite extends SparkFunSuite with 
LocalSparkContext {
 scheduleTaskAndVerifyId(2, rootPool, 6)
 scheduleTaskAndVerifyId(3, rootPool, 2)
   }
+
+  test("SPARK-18066: FIFO Scheduler just uses root pool") {
--- End diff --

super nit: the jira doesn't really say anything other than "add these 
tests", so not much point in listing it here, just name test "FIFO Scheduler 
just uses root pool".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage polic...

2017-01-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15604#discussion_r94499385
  
--- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala ---
@@ -178,4 +180,97 @@ class PoolSuite extends SparkFunSuite with 
LocalSparkContext {
 scheduleTaskAndVerifyId(2, rootPool, 6)
 scheduleTaskAndVerifyId(3, rootPool, 2)
   }
+
+  test("SPARK-18066: FIFO Scheduler just uses root pool") {
+sc = new SparkContext("local", "PoolSuite")
+val taskScheduler = new TaskSchedulerImpl(sc)
+
+val rootPool = new Pool("", SchedulingMode.FIFO, initMinShare = 0, 
initWeight = 0)
+val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
+
+val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, 
taskScheduler)
+val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, 
taskScheduler)
+
+val properties = new Properties()
+properties.setProperty("spark.scheduler.pool", TEST_POOL)
+
+// FIFOSchedulableBuilder just uses rootPool so even if properties are 
set, related pool
+// (testPool) is not created and TaskSetManagers are added to rootPool
+schedulableBuilder.addTaskSetManager(taskSetManager0, properties)
+schedulableBuilder.addTaskSetManager(taskSetManager1, properties)
+
+assert(rootPool.getSchedulableByName(TEST_POOL) == null)
+assert(rootPool.schedulableQueue.size == 2)
+assert(rootPool.getSchedulableByName(taskSetManager0.name) === 
taskSetManager0)
+assert(rootPool.getSchedulableByName(taskSetManager1.name) === 
taskSetManager1)
+  }
+
+  test("SPARK-18066: FAIR Scheduler uses default pool when 
spark.scheduler.pool property is not " +
+"set") {
+sc = new SparkContext("local", "PoolSuite")
+val taskScheduler = new TaskSchedulerImpl(sc)
+
+val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, 
initWeight = 0)
+val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+schedulableBuilder.buildPools()
+
+// FAIR Scheduler uses default pool when pool properties are null
+val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, 
taskScheduler)
+
+schedulableBuilder.addTaskSetManager(taskSetManager0, null)
+
+val defaultPool = 
rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME)
+assert(defaultPool != null)
+assert(defaultPool.schedulableQueue.size == 1)
+assert(defaultPool.getSchedulableByName(taskSetManager0.name) === 
taskSetManager0)
+
+// FAIR Scheduler uses default pool when spark.scheduler.pool property 
is not set
+val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, 
taskScheduler)
+
+schedulableBuilder.addTaskSetManager(taskSetManager1, new Properties())
+
+assert(defaultPool.schedulableQueue.size == 2)
+assert(defaultPool.getSchedulableByName(taskSetManager1.name) === 
taskSetManager1)
+
+// FAIR Scheduler uses default pool when spark.scheduler.pool property 
is set as default pool
+val taskSetManager2 = createTaskSetManager(stageId = 2, numTasks = 1, 
taskScheduler)
+
+val properties = new Properties()
+properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, 
schedulableBuilder
+  .DEFAULT_POOL_NAME)
+
+schedulableBuilder.addTaskSetManager(taskSetManager2, properties)
+
+assert(defaultPool.schedulableQueue.size == 3)
+assert(defaultPool.getSchedulableByName(taskSetManager2.name) === 
taskSetManager2)
+  }
+
+  test("SPARK-18066: FAIR Scheduler creates a new pool when 
spark.scheduler.pool property points " +
+"non-existent") {
--- End diff --

minor nit: rename to "FAIR Scheduler creates a new pool when 
spark.scheduler.pool property points to a non-existent pool"

bigger question:
@markhamstra @kayousterhout Is this really the desired behavior?  Or is 
there a bug -- should it fail fast?  It looks like [this behavior was 
intentional](https://github.com/apache/spark/blob/b67b35f76b684c5176dc683e7491fd01b43f4467/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala#L143-L145).
  Then again, that comment makes it sound like the user is going to directly 
modify the `weight` and `minShare` of the constructed pool, but though [they 
are 
`var`s](https://github.com/apache/spark/blob/b67b35f76b684c5176dc683e7491fd01b43f4467/core/src/main/scala/org/apache/spark/scheduler/Pool.scala#L40-L41),
 its `private [spark]` and there isn't anything else exposing it to the user.

I prefer fail-fast behavior, but it see

[GitHub] spark pull request #15237: [SPARK-17663] [CORE] SchedulableBuilder should ha...

2017-01-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15237#discussion_r94501620
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala ---
@@ -102,38 +103,52 @@ private[spark] class FairSchedulableBuilder(val 
rootPool: Pool, conf: SparkConf)
 for (poolNode <- (xml \\ POOLS_PROPERTY)) {
 
   val poolName = (poolNode \ POOL_NAME_PROPERTY).text
-  var schedulingMode = DEFAULT_SCHEDULING_MODE
-  var minShare = DEFAULT_MINIMUM_SHARE
-  var weight = DEFAULT_WEIGHT
-
-  val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
-  if (xmlSchedulingMode != "") {
-try {
-  schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
-} catch {
-  case e: NoSuchElementException =>
-logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " 
+
-  s"using the default schedulingMode: $schedulingMode")
-}
-  }
 
-  val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
-  if (xmlMinShare != "") {
-minShare = xmlMinShare.toInt
-  }
+  val xmlSchedulingMode = (poolNode \ 
SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
+  val schedulingMode = getSchedulingModeValue(xmlSchedulingMode, 
DEFAULT_SCHEDULING_MODE)
 
-  val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
-  if (xmlWeight != "") {
-weight = xmlWeight.toInt
-  }
+  val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text.trim
+  val minShare = getIntValue(MINIMUM_SHARES_PROPERTY, xmlMinShare, 
DEFAULT_MINIMUM_SHARE)
+
+  val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text.trim
+  val weight = getIntValue(WEIGHT_PROPERTY, xmlWeight, DEFAULT_WEIGHT)
+
+  rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, 
weight))
 
-  val pool = new Pool(poolName, schedulingMode, minShare, weight)
-  rootPool.addSchedulable(pool)
   logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: 
%d".format(
 poolName, schedulingMode, minShare, weight))
 }
   }
 
+  private def getSchedulingModeValue(data: String, defaultValue: 
SchedulingMode): SchedulingMode = {
+val warningMessage = s"Unsupported schedulingMode: $data, using the 
default schedulingMode: " +
+  s"$defaultValue"
+try {
+  if (SchedulingMode.withName(data) != SchedulingMode.NONE) {
+SchedulingMode.withName(data)
+  } else {
+logWarning(warningMessage)
+defaultValue
+  }
+} catch {
+  case e: NoSuchElementException =>
+logWarning(warningMessage)
+defaultValue
+}
+  }
+
+  private def getIntValue(propertyName: String, data: String, 
defaultValue: Int): Int = {
+try {
+  data.toInt
+} catch {
+  case e: NumberFormatException =>
+logWarning(s"Error while loading scheduler allocation file at 
$schedulerAllocFile. " +
--- End diff --

ugh, this is kind of a nuisance, but I realize now that 
`schedulerAllocFile` isn't necessarily the right file -- that might be empty, 
and there might be a `fairscheduler.xml` sitting on the classpath.  Can you get 
the right file name in both cases?  (Better to leave it to not include any 
filename, than to 

And can the warning include the poolname as well?

Finally, it would be nice to add this extra info to the mode warning too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15237: [SPARK-17663] [CORE] SchedulableBuilder should ha...

2017-01-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15237#discussion_r94501723
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala ---
@@ -102,38 +103,52 @@ private[spark] class FairSchedulableBuilder(val 
rootPool: Pool, conf: SparkConf)
 for (poolNode <- (xml \\ POOLS_PROPERTY)) {
 
   val poolName = (poolNode \ POOL_NAME_PROPERTY).text
-  var schedulingMode = DEFAULT_SCHEDULING_MODE
-  var minShare = DEFAULT_MINIMUM_SHARE
-  var weight = DEFAULT_WEIGHT
-
-  val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
-  if (xmlSchedulingMode != "") {
-try {
-  schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
-} catch {
-  case e: NoSuchElementException =>
-logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " 
+
-  s"using the default schedulingMode: $schedulingMode")
-}
-  }
 
-  val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
-  if (xmlMinShare != "") {
-minShare = xmlMinShare.toInt
-  }
+  val xmlSchedulingMode = (poolNode \ 
SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
+  val schedulingMode = getSchedulingModeValue(xmlSchedulingMode, 
DEFAULT_SCHEDULING_MODE)
--- End diff --

this is minor, but if you're going to have a helper method here, can you do 
*all* the parsing inside it?  include the one line above `(poolNode \ 
SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase`.

Same goes for `getIntValue`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15237: [SPARK-17663] [CORE] SchedulableBuilder should ha...

2017-01-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15237#discussion_r94500060
  
--- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala ---
@@ -178,4 +176,39 @@ class PoolSuite extends SparkFunSuite with 
LocalSparkContext {
 scheduleTaskAndVerifyId(2, rootPool, 6)
 scheduleTaskAndVerifyId(3, rootPool, 2)
   }
+
+  test("SPARK-17663: FairSchedulableBuilder sets default values for blank 
or invalid datas") {
+val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml")
+  .getFile()
+val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, 
xmlPath)
+
+val rootPool = new Pool("", FAIR, 0, 0)
+val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf)
+schedulableBuilder.buildPools()
+
+verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
+verifyPool(rootPool, "pool_with_invalid_min_share", 0, 2, FAIR)
+verifyPool(rootPool, "pool_with_invalid_weight", 1, 1, FAIR)
+verifyPool(rootPool, "pool_with_invalid_scheduling_mode", 3, 2, FIFO)
+verifyPool(rootPool, "pool_with_non_uppercase_scheduling_mode", 2, 1, 
FAIR)
+verifyPool(rootPool, "pool_with_NONE_scheduling_mode", 1, 2, FIFO)
+verifyPool(rootPool, "pool_with_whitespace_min_share", 0, 2, FAIR)
+verifyPool(rootPool, "pool_with_whitespace_weight", 1, 1, FAIR)
+verifyPool(rootPool, "pool_with_whitespace_scheduling_mode", 3, 2, 
FIFO)
+verifyPool(rootPool, "pool_with_empty_min_share", 0, 3, FAIR)
+verifyPool(rootPool, "pool_with_empty_weight", 2, 1, FAIR)
+verifyPool(rootPool, "pool_with_empty_scheduling_mode", 2, 2, FIFO)
+verifyPool(rootPool, "pool_with_min_share_surrounded_whitespace", 3, 
2, FAIR)
+verifyPool(rootPool, "pool_with_weight_surrounded_whitespace", 1, 2, 
FAIR)
+verifyPool(rootPool, 
"pool_with_scheduling_mode_surrounded_whitespace", 3, 2, FAIR)
--- End diff --

nit: can you just combine all these "surrounded_whitespace" cases into one 
for all the properties?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage policies tes...

2017-01-03 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/15604
  
hmm, so I'm thinking about this again, along with your other pr 
https://github.com/apache/spark/pull/15237

and it got me thinking -- there also aren't any tests for a mix of FIFO and 
FAIR (at least, I don't see it in "Nested Pool Test" in PoolSuite).  While you 
are doing this, would you like to add some tests for that as well?  (to be 
perfectly honest, I need to spend a little bit of time thinking about what that 
behavior should be anyway, but I will have to do that a little later.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage polic...

2017-01-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15604#discussion_r94670256
  
--- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala ---
@@ -178,4 +180,97 @@ class PoolSuite extends SparkFunSuite with 
LocalSparkContext {
 scheduleTaskAndVerifyId(2, rootPool, 6)
 scheduleTaskAndVerifyId(3, rootPool, 2)
   }
+
+  test("SPARK-18066: FIFO Scheduler just uses root pool") {
+sc = new SparkContext("local", "PoolSuite")
+val taskScheduler = new TaskSchedulerImpl(sc)
+
+val rootPool = new Pool("", SchedulingMode.FIFO, initMinShare = 0, 
initWeight = 0)
+val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
+
+val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, 
taskScheduler)
+val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, 
taskScheduler)
+
+val properties = new Properties()
+properties.setProperty("spark.scheduler.pool", TEST_POOL)
+
+// FIFOSchedulableBuilder just uses rootPool so even if properties are 
set, related pool
+// (testPool) is not created and TaskSetManagers are added to rootPool
+schedulableBuilder.addTaskSetManager(taskSetManager0, properties)
+schedulableBuilder.addTaskSetManager(taskSetManager1, properties)
+
+assert(rootPool.getSchedulableByName(TEST_POOL) == null)
+assert(rootPool.schedulableQueue.size == 2)
+assert(rootPool.getSchedulableByName(taskSetManager0.name) === 
taskSetManager0)
+assert(rootPool.getSchedulableByName(taskSetManager1.name) === 
taskSetManager1)
+  }
+
+  test("SPARK-18066: FAIR Scheduler uses default pool when 
spark.scheduler.pool property is not " +
+"set") {
+sc = new SparkContext("local", "PoolSuite")
+val taskScheduler = new TaskSchedulerImpl(sc)
+
+val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, 
initWeight = 0)
+val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+schedulableBuilder.buildPools()
+
+// FAIR Scheduler uses default pool when pool properties are null
+val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, 
taskScheduler)
+
+schedulableBuilder.addTaskSetManager(taskSetManager0, null)
+
+val defaultPool = 
rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME)
+assert(defaultPool != null)
+assert(defaultPool.schedulableQueue.size == 1)
+assert(defaultPool.getSchedulableByName(taskSetManager0.name) === 
taskSetManager0)
+
+// FAIR Scheduler uses default pool when spark.scheduler.pool property 
is not set
+val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, 
taskScheduler)
+
+schedulableBuilder.addTaskSetManager(taskSetManager1, new Properties())
+
+assert(defaultPool.schedulableQueue.size == 2)
+assert(defaultPool.getSchedulableByName(taskSetManager1.name) === 
taskSetManager1)
+
+// FAIR Scheduler uses default pool when spark.scheduler.pool property 
is set as default pool
+val taskSetManager2 = createTaskSetManager(stageId = 2, numTasks = 1, 
taskScheduler)
+
+val properties = new Properties()
+properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, 
schedulableBuilder
+  .DEFAULT_POOL_NAME)
+
+schedulableBuilder.addTaskSetManager(taskSetManager2, properties)
+
+assert(defaultPool.schedulableQueue.size == 3)
+assert(defaultPool.getSchedulableByName(taskSetManager2.name) === 
taskSetManager2)
+  }
+
+  test("SPARK-18066: FAIR Scheduler creates a new pool when 
spark.scheduler.pool property points " +
+"non-existent") {
--- End diff --

yeah I prefer failing fast for the same reason.  But I guess we can't 
actually change this, since there is a valid use case for this behavior.  We 
don't really know if others are using this, and I don't think we should change 
it under them in the next release.

In any case, I still think we should also change `Pool.weight` and 
`Pool.minShare` to `val`s.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16376: [SPARK-18967][SCHEDULER] compute locality levels even if...

2017-01-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16376
  
Thanks for the feedback.  I've updated the comment on `myLocalityLevels`.

Also I updated the tests slightly.  To ensure that we're really testing no 
delay, I updated the tests to use a manual clock which never advances.  And 
also to address @lirui-intel 's concern, i also added a test that we still 
schedule at non-preferred locations immediately with delay scheduling off.  
(Somewhat obvious in current implementation since we always add `ANY` but I 
think its a good regression test if nothing else.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16376: [SPARK-18967][SCHEDULER] compute locality levels ...

2017-01-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16376#discussion_r94835320
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -159,7 +159,12 @@ private[spark] class TaskSetManager(
 addPendingTask(i)
   }
 
-  // Figure out which locality levels we have in our TaskSet, so we can do 
delay scheduling
+  /**
+   * Track the set of locality levels which are valid given the tasks 
locality preferences and
+   * the set of currently available executors.  This is updated as 
executors are added and removed.
+   * This allows a performance optimization, of skipping levels that 
aren't relevant (eg., skip
+   * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current 
set of executors).
+   */
   var myLocalityLevels = computeValidLocalityLevels()
--- End diff --

As I was figuring out the purpose of this for what to put in the comment, I 
made a couple of observations:

1) For each executor we add or remove, its an O(numExecutors) operation to 
update the locality levels.  So overall its an O(numExecutors^2) to add a 
bunch.  Minor on small clusters, but I wonder if this is an issue when you're 
using dynamic allocation and going up and down to 1000s of executors.  Its all 
happening with a lock on the `TaskSchedulerImpl` too.

2) Though we recompute valid locality levels as executors come and go, we 
do *not* as tasks complete.  That's not a problem -- as offers come in, we 
still go through the right task lists.  But it does make me wonder whether this 
business of updating the locality levels for the current set of executors is 
useful, and instead we should just always use all levels.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16053: [SPARK-17931] Eliminate unncessary task (de) serializati...

2017-01-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16053
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16053: [SPARK-17931] Eliminate unncessary task (de) serializati...

2017-01-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16053
  
lgtm, assuming tests pass


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16053: [SPARK-17931] Eliminate unncessary task (de) serializati...

2017-01-06 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16053
  
merged to master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16489: [SPARK-18836][MINOR] remove obsolete param doc fo...

2017-01-06 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/16489

[SPARK-18836][MINOR] remove obsolete param doc for metrics

## What changes were proposed in this pull request?

4cb49412d1d7d10ffcc738475928c7de2bc59fd4 removed the `metrics` parameter 
from Task, but left it in the docs -- this removes it from the docs as well.  
(The original commit removed it from the docs for ResultTask and 
ShuffleMapTask, but not the parent Task.)

## How was this patch tested?

jenkins

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark SPARK-18836_doc_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16489.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16489


commit 974921dca1a74fcb4fff4fdc6c3bdba314f681ec
Author: Imran Rashid 
Date:   2017-01-06T17:15:03Z

remove obsolete param doc




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16489: [SPARK-18836][MINOR] remove obsolete param doc for metri...

2017-01-06 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16489
  
@shivaram @kayousterhout super minor follow up to 
https://github.com/apache/spark/pull/16261 I just noticed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16489: [SPARK-18836][MINOR] remove obsolete param doc for metri...

2017-01-06 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16489
  
oh thanks @srowen , I didn't know it could do that.  I'll update with a 
larger list of fixes then


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16489: [SPARK-18836][MINOR] remove obsolete param doc fo...

2017-01-06 Thread squito
Github user squito closed the pull request at:

https://github.com/apache/spark/pull/16489


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16376: [SPARK-18967][SCHEDULER] compute locality levels even if...

2017-01-09 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16376
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95198208
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,36 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer,
+private var task_ : Task[_] = null) extends  Logging {
--- End diff --

rather than having `null` be a default, I think it might be better to pass 
it explicitly in `decode`, where you can also comment on why you do that, 
rather than do the deserialization immediately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95198044
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,36 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer,
+private var task_ : Task[_] = null) extends  Logging {
+
+  def this(
+  taskId: Long,
+  attemptNumber: Int,
+  executorId: String,
+  name: String,
+  index: Int, // Index within this task's TaskSet
+  addedFiles: Map[String, Long],
+  addedJars: Map[String, Long],
+  properties: Properties,
+  task: Task[_]) {
+this(taskId, attemptNumber, executorId, name, index,
+  addedFiles, addedJars, properties, null, task)
+  }
+
+  lazy val serializedTask: ByteBuffer = {
+if (serializedTask_ == null) {
+  serializedTask_ = try {
+ByteBuffer.wrap(Utils.serialize(task_))
+  } catch {
+case NonFatal(e) =>
+  val msg = s"Failed to serialize task $taskId, not attempting to 
retry it."
+  logError(msg, e)
+  throw new TaskNotSerializableException(e)
+  }
+}
+serializedTask_
--- End diff --

can you include a comment here about the purpose of this?  Maybe just 
inside the `if (serialized_ == null)`, something like "This is where we 
serialize the task on the driver before sending it to the executor.  This is 
not done when creating the TaskDescription so we can postpone this 
serialization to later in the scheduling process -- particularly, so it can 
happen in another thread by the CoarseGrainedSchedulerBackend.  On the 
executors, this will already be populated by `decode`".

Also, I feel like this would be more clear as a `def`, rather than a `lazy 
val`.  Since its a lazy val I keep thinking this itself is going to be 
serialized, and then I have to remind myself its just used by encode and the 
serialization is manually managed.  The only advantage of `lazy val` is 
automatic thread-safety, but that's irrelevant here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95198725
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -149,7 +149,12 @@ private[spark] object Utils extends Logging {
 
   /** Deserialize an object using Java serialization and the given 
ClassLoader */
   def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
-val bis = new ByteArrayInputStream(bytes)
+deserialize(ByteBuffer.wrap(bytes), loader)
+  }
+
+  /** Deserialize an object using Java serialization and the given 
ClassLoader */
+  def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
+val bis = new ByteBufferInputStream(bytes)
--- End diff --

for other reviewers: I was a little worried that maybe this was 
(ever-so-slightly) changing a really important code path, but looks like this 
method actually is barely used, so not a big deal. (`SparkContext.objectFile` 
and `ReceivedBlockTracker.cleanupBatches`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95199529
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -592,47 +579,6 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index 
=== 1)
   }
 
-  test("do not emit warning when serialized task is small") {
--- End diff --

This may be a stretch, but `Task` *could* be big, right?  Eg. if there was 
a long chain of partitions, and for some reason they had a lot of data?  That 
seems unlikely, but I also have no idea if it ever happens or not.  It seems 
easy enough to add the check back in -- is there any reason not to?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16376: [SPARK-18967][SCHEDULER] compute locality levels ...

2017-01-10 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16376#discussion_r95419719
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -54,7 +54,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, 
Utils}
 private[spark] class TaskSchedulerImpl private[scheduler](
 val sc: SparkContext,
 val maxTaskFailures: Int,
-blacklistTrackerOpt: Option[BlacklistTracker],
+val blacklistTrackerOpt: Option[BlacklistTracker],
--- End diff --

oh good point, this change is not necessary (must have been part of another 
change which I later reverted, sorry)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-01-10 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/15505
  
@witgo -- right, I'm saying that we should make local mode work to make 
debugging & development as easy as possible, not to maximize performance.  That 
means we *should* do the extra serialization in local mode, to mimic the 
behavior of running on a cluster.

It doesn't look like task serialization still happens in local mode to me.  
Tasks only get serialized inside `TaskDescription.encode`, but that is only 
called by `CoarseGrainedSchedulerBackend`.Before this change, task 
serialization happened in 
[`localScheduler.reviveOffers`](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala#L85),
 because it happened inside the `TaskSchedulerImpl`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-10 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95423240
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -149,7 +149,12 @@ private[spark] object Utils extends Logging {
 
   /** Deserialize an object using Java serialization and the given 
ClassLoader */
   def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
-val bis = new ByteArrayInputStream(bytes)
+deserialize(ByteBuffer.wrap(bytes), loader)
+  }
+
+  /** Deserialize an object using Java serialization and the given 
ClassLoader */
+  def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
+val bis = new ByteBufferInputStream(bytes)
--- End diff --

I think you misunderstood my comment.  I agree, this change is fine.  Its 
correctness is clear, and definitely covered by tests.

My initial concern was that this might be some super-common code path, and 
so even a slight modification may effect performance (should be optimized away 
by the jit anyway but thought it was worth thinking about).

Anyway, first of all it was a really minor concern, and second of all it 
turned out to be totally irrelevant since its *not* a commonly used method.  I 
was just trying to help out any other reviewers from also trying to track this 
down.  Sorry for the confusion :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16376: [SPARK-18967][SCHEDULER] compute locality levels ...

2017-01-10 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16376#discussion_r95424970
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -54,7 +54,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, 
Utils}
 private[spark] class TaskSchedulerImpl private[scheduler](
 val sc: SparkContext,
 val maxTaskFailures: Int,
-blacklistTrackerOpt: Option[BlacklistTracker],
+val blacklistTrackerOpt: Option[BlacklistTracker],
--- End diff --

doh, actually this is used in tests.  I override `createTaskSetManager` to 
use my manual clock.  Another alternative is to allow the clock to get passed 
in to the `TaskSCheudlerImpl` constructor, but that ends up being a bigger code 
change.

I'll make it `private[scheduler] val`, slightly tighter than the implicit 
`private[spark]` it would be otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-10 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95433750
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,36 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer,
+private var task_ : Task[_] = null) extends  Logging {
--- End diff --

@kayousterhout I like that idea, but I took a brief look at how much would 
have to change, and its a ton of changes, since now 
`TaskSchedulerImpl.resourceOffers` would be returning `TaskDescription`s and 
`Task`s, which then leads to small changes in a lot more places.   I'm torn, 
but inclined to agree with @witgo


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16376: [SPARK-18967][SCHEDULER] compute locality levels ...

2017-01-10 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16376#discussion_r95446452
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -819,4 +819,84 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
 assert(!taskScheduler.hasExecutorsAliveOnHost("host0"))
 assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty)
   }
+
+  test("Locality should be used for bulk offers even with delay scheduling 
off") {
+// for testing, we create a task scheduler which lets us control how 
offers are shuffled
+val conf = new SparkConf()
+  .set("spark.locality.wait", "0")
+sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
+// we create a manual clock just so we can be sure the clock doesn't 
advance at all in this test
+val clock = new ManualClock()
+
+// We customize the task scheduler just to let us control the way 
offers are shuffled, so we
+// can be sure we try both permutations, and to control the clock on 
the tasksetmanager.
+val taskScheduler = new TaskSchedulerImpl(sc) {
+  override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
+// Don't shuffle the offers around for this test.  Instead, we'll 
just pass in all
+// the permutations we care about directly.
+offers
+  }
+  override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: 
Int): TaskSetManager = {
+new TaskSetManager(this, taskSet, maxTaskFailures, 
blacklistTrackerOpt, clock)
+  }
+}
+// Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
+new DAGScheduler(sc, taskScheduler) {
+  override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
+  override def executorAdded(execId: String, host: String) {}
+}
+taskScheduler.initialize(new FakeSchedulerBackend)
+
+
+// Make two different offers -- one in the preferred location, one 
that is not.
+val offers = IndexedSeq(
+  WorkerOffer("exec1", "host1", 1),
+  WorkerOffer("exec2", "host2", 1)
+)
+Seq(false, true).foreach { swapOrder =>
+  // Submit a taskset with locality preferences.
+  val taskSet = FakeTask.createTaskSet(
+1, stageId = 1, stageAttemptId = 0, Seq(TaskLocation("host1", 
"exec1")))
+  taskScheduler.submitTasks(taskSet)
+  val shuffledOffers = if (swapOrder) offers.reverse else offers
+  // Regardless of the order of the offers (after the task scheduler 
shuffles them), we should
+  // always take advantage of the local offer.
+  val taskDescs = taskScheduler.resourceOffers(shuffledOffers).flatten
+  withClue(s"swapOrder = $swapOrder") {
+assert(taskDescs.size === 1)
+assert(taskDescs.head.executorId === "exec1")
+  }
+}
+  }
+
+  test("With delay scheduling off, tasks can be run at any locality level 
immediately") {
--- End diff --

yes, you are absolutely right.  I've updated and also added a check to make 
sure tsm includes lower locality levels.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-10 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95448131
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,36 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer,
+private var task_ : Task[_] = null) extends  Logging {
--- End diff --

yeah, that leads to ~100 compile errors in test code.  its all small 
changes but still, not sure if there is much value.  I don't understand the 
comment on LaunchTask -- your commit does not change it, right?

in any case, if you feel like we should do that refactor, I am not opposed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16376: [SPARK-18967][SCHEDULER] compute locality levels even if...

2017-01-11 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16376
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16521: [SPARK-19139][core] New auth mechanism for transp...

2017-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16521#discussion_r95625631
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/crypto/README.md 
---
@@ -0,0 +1,158 @@
+Spark Auth Protocol and AES Encryption Support
+==
+
+This file describes an auth protocol used by Spark as a more secure 
alternative to DIGEST-MD5. This
+protocol is built on symmetric key encryption, based on the assumption 
that the two endpoints being
+authenticated share a common secret, which is how Spark authentication 
currently works. The protocol
+provides mutual authentication, meaning that after the negotiation both 
parties know that the remote
+side knows the shared secret. The protocol is influenced by the ISO/IEC 
9798 protocol, although it's
+not an implementation of it.
+
+This protocol could be replaced with TLS PSK, except no PSK ciphers are 
available in the currently
+released JREs.
+
+The protocol aims at solving the following shortcomings in Spark's current 
usage of DIGEST-MD5:
+
+- MD5 is an aging hash algorithm with known weaknesses, and a more secure 
alternative is desired.
+- DIGEST-MD5 has a pre-defined set of ciphers for which it can generate 
keys. The only
+  viable, supported cipher these days is 3DES, and a more modern 
alternative is desired.
+- Encrypting AES session keys with 3DES doesn't solve the issue, since the 
weakest link
+  in the negotiation would still be MD5 and 3DES.
+
+The protocol assumes that the shared secret is generated and distributed 
in a secure manner.
+
+The protocol always negotiates encryption keys. If encryption is not 
desired, the existing
+SASL-based authentication, or no authentication at all, can be chosen 
instead.
+
+When messages are described below, it's expected that the implementation 
should support
+arbitrary sizes for fields that don't have a fixed size.
+
+Client Challenge
+
+
+The auth negotiation is started by the client. The client starts by 
generating an encryption
+key based on the application's shared secret, and a nonce.
+
+KEY = KDF(SECRET, SALT, KEY_LENGTH)
+
+Where:
+- KDF(): a key derivation function that takes a secret, a salt, a 
configurable number of
+  iterations, and a configurable key length.
+- SALT: a byte sequence used to salt the key derivation function.
+- KEY_LENGTH: length of the encryption key to generate.
+
+
+The client generates a message with the following content:
+
+CLIENT_CHALLENGE = (
+APP_ID,
+KDF,
+ITERATIONS,
+CIPHER,
+KEY_LENGTH,
+ANONCE,
+ENC(APP_ID || ANONCE || CHALLENGE))
+
+Where:
+
+- APP_ID: the application ID which the server uses to identify the shared 
secret.
+- KDF: the key derivation function described above.
+- ITERATIONS: number of iterations to run the KDF when generating keys.
+- CIPHER: the cipher used to encrypt data.
+- KEY_LENGTH: length of the encryption keys to generate, in bits.
+- ANONCE: the nonce used as the salt when generating the auth key.
+- ENC(): an encryption function that uses the cipher and the generated 
key. This function
+  will also be used in the definition of other messages below.
+- CCHALLENGE: a byte sequence used as a challenge to the server.
+- ||: concatenation operator.
+
+When strings are used where byte arrays are expected, the UTF-8 
representation of the string
+is assumed.
+
+To respond to the challenge, the server should consider the byte array as 
representing an
+arbitrary-length integer, and respond with the value of the integer plus 
one.
+
+
+Server Response And Challenge
+-
+
+Once the client challenge is received, the server will generate the same 
auth key by
+using the same algorithm the client has used. It will then verify the 
client challenge:
+if the APP_ID and ANONCE fields match, the server knows that the client 
has the shared
+secret. The server then creates a response to the client challenge, to 
prove that it also
+has the secret key, and provides parameters to be used when creating the 
session key.
+
+The following describes the response from the server:
+
+SERVER_CHALLENGE = (
+ENC(APP_ID || ANONCE || RESPONSE),
+ENC(SNONCE),
+ENC(INIV),
+ENC(OUTIV))
+
+Where:
+
+- CRESPONSE: the server's response to the client challenge.
+- SNONCE: a nonce to be used as salt when generating the session key.
+- INIV: initialization vector used to initialize the input c

[GitHub] spark pull request #16521: [SPARK-19139][core] New auth mechanism for transp...

2017-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16521#discussion_r95625434
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/crypto/README.md 
---
@@ -0,0 +1,158 @@
+Spark Auth Protocol and AES Encryption Support
+==
+
+This file describes an auth protocol used by Spark as a more secure 
alternative to DIGEST-MD5. This
+protocol is built on symmetric key encryption, based on the assumption 
that the two endpoints being
+authenticated share a common secret, which is how Spark authentication 
currently works. The protocol
+provides mutual authentication, meaning that after the negotiation both 
parties know that the remote
+side knows the shared secret. The protocol is influenced by the ISO/IEC 
9798 protocol, although it's
+not an implementation of it.
+
+This protocol could be replaced with TLS PSK, except no PSK ciphers are 
available in the currently
+released JREs.
+
+The protocol aims at solving the following shortcomings in Spark's current 
usage of DIGEST-MD5:
+
+- MD5 is an aging hash algorithm with known weaknesses, and a more secure 
alternative is desired.
+- DIGEST-MD5 has a pre-defined set of ciphers for which it can generate 
keys. The only
+  viable, supported cipher these days is 3DES, and a more modern 
alternative is desired.
+- Encrypting AES session keys with 3DES doesn't solve the issue, since the 
weakest link
+  in the negotiation would still be MD5 and 3DES.
+
+The protocol assumes that the shared secret is generated and distributed 
in a secure manner.
+
+The protocol always negotiates encryption keys. If encryption is not 
desired, the existing
+SASL-based authentication, or no authentication at all, can be chosen 
instead.
+
+When messages are described below, it's expected that the implementation 
should support
+arbitrary sizes for fields that don't have a fixed size.
+
+Client Challenge
+
+
+The auth negotiation is started by the client. The client starts by 
generating an encryption
+key based on the application's shared secret, and a nonce.
+
+KEY = KDF(SECRET, SALT, KEY_LENGTH)
+
+Where:
+- KDF(): a key derivation function that takes a secret, a salt, a 
configurable number of
+  iterations, and a configurable key length.
+- SALT: a byte sequence used to salt the key derivation function.
+- KEY_LENGTH: length of the encryption key to generate.
+
+
+The client generates a message with the following content:
+
+CLIENT_CHALLENGE = (
+APP_ID,
+KDF,
+ITERATIONS,
+CIPHER,
+KEY_LENGTH,
+ANONCE,
+ENC(APP_ID || ANONCE || CHALLENGE))
+
+Where:
+
+- APP_ID: the application ID which the server uses to identify the shared 
secret.
+- KDF: the key derivation function described above.
+- ITERATIONS: number of iterations to run the KDF when generating keys.
+- CIPHER: the cipher used to encrypt data.
+- KEY_LENGTH: length of the encryption keys to generate, in bits.
+- ANONCE: the nonce used as the salt when generating the auth key.
+- ENC(): an encryption function that uses the cipher and the generated 
key. This function
+  will also be used in the definition of other messages below.
+- CCHALLENGE: a byte sequence used as a challenge to the server.
+- ||: concatenation operator.
+
+When strings are used where byte arrays are expected, the UTF-8 
representation of the string
+is assumed.
+
+To respond to the challenge, the server should consider the byte array as 
representing an
+arbitrary-length integer, and respond with the value of the integer plus 
one.
+
+
+Server Response And Challenge
+-
+
+Once the client challenge is received, the server will generate the same 
auth key by
+using the same algorithm the client has used. It will then verify the 
client challenge:
+if the APP_ID and ANONCE fields match, the server knows that the client 
has the shared
+secret. The server then creates a response to the client challenge, to 
prove that it also
+has the secret key, and provides parameters to be used when creating the 
session key.
+
+The following describes the response from the server:
+
+SERVER_CHALLENGE = (
+ENC(APP_ID || ANONCE || RESPONSE),
+ENC(SNONCE),
+ENC(INIV),
+ENC(OUTIV))
+
+Where:
+
+- CRESPONSE: the server's response to the client challenge.
--- End diff --

typo: RESPONSE


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as 

[GitHub] spark pull request #16521: [SPARK-19139][core] New auth mechanism for transp...

2017-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16521#discussion_r95624898
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/crypto/README.md 
---
@@ -0,0 +1,158 @@
+Spark Auth Protocol and AES Encryption Support
+==
+
+This file describes an auth protocol used by Spark as a more secure 
alternative to DIGEST-MD5. This
+protocol is built on symmetric key encryption, based on the assumption 
that the two endpoints being
+authenticated share a common secret, which is how Spark authentication 
currently works. The protocol
+provides mutual authentication, meaning that after the negotiation both 
parties know that the remote
+side knows the shared secret. The protocol is influenced by the ISO/IEC 
9798 protocol, although it's
+not an implementation of it.
+
+This protocol could be replaced with TLS PSK, except no PSK ciphers are 
available in the currently
+released JREs.
+
+The protocol aims at solving the following shortcomings in Spark's current 
usage of DIGEST-MD5:
+
+- MD5 is an aging hash algorithm with known weaknesses, and a more secure 
alternative is desired.
+- DIGEST-MD5 has a pre-defined set of ciphers for which it can generate 
keys. The only
+  viable, supported cipher these days is 3DES, and a more modern 
alternative is desired.
+- Encrypting AES session keys with 3DES doesn't solve the issue, since the 
weakest link
+  in the negotiation would still be MD5 and 3DES.
+
+The protocol assumes that the shared secret is generated and distributed 
in a secure manner.
+
+The protocol always negotiates encryption keys. If encryption is not 
desired, the existing
+SASL-based authentication, or no authentication at all, can be chosen 
instead.
+
+When messages are described below, it's expected that the implementation 
should support
+arbitrary sizes for fields that don't have a fixed size.
+
+Client Challenge
+
+
+The auth negotiation is started by the client. The client starts by 
generating an encryption
+key based on the application's shared secret, and a nonce.
+
+KEY = KDF(SECRET, SALT, KEY_LENGTH)
+
+Where:
+- KDF(): a key derivation function that takes a secret, a salt, a 
configurable number of
+  iterations, and a configurable key length.
+- SALT: a byte sequence used to salt the key derivation function.
+- KEY_LENGTH: length of the encryption key to generate.
+
+
+The client generates a message with the following content:
+
+CLIENT_CHALLENGE = (
+APP_ID,
+KDF,
+ITERATIONS,
+CIPHER,
+KEY_LENGTH,
+ANONCE,
+ENC(APP_ID || ANONCE || CHALLENGE))
+
+Where:
+
+- APP_ID: the application ID which the server uses to identify the shared 
secret.
+- KDF: the key derivation function described above.
+- ITERATIONS: number of iterations to run the KDF when generating keys.
+- CIPHER: the cipher used to encrypt data.
+- KEY_LENGTH: length of the encryption keys to generate, in bits.
+- ANONCE: the nonce used as the salt when generating the auth key.
+- ENC(): an encryption function that uses the cipher and the generated 
key. This function
+  will also be used in the definition of other messages below.
+- CCHALLENGE: a byte sequence used as a challenge to the server.
--- End diff --

typo: CHALLENGE


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16521: [SPARK-19139][core] New auth mechanism for transp...

2017-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16521#discussion_r95661186
  
--- Diff: docs/configuration.md ---
@@ -1625,40 +1625,40 @@ Apart from these, the following properties are also 
available, and may be useful
   
 
 
-  spark.authenticate.enableSaslEncryption
+  spark.network.crypto.enabled
   false
   
-Enable encrypted communication when authentication is
-enabled. This is supported by the block transfer service and the
-RPC endpoints.
+Enable encryption using the commons-crypto library for RPC and block 
transfer service.
+Requires spark.authenticate to be enabled.
--- End diff --

if `spark.authenticate=false`, what happens if this is `true`?  It looks 
like it is just ignored, I think fail-fast would be ideal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16346: [SPARK-16654][CORE] Add UI coverage for Applicati...

2017-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16346#discussion_r95725456
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala ---
@@ -88,6 +88,86 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
 new TaskSetBlacklist(conf, stageId, clock)
   }
 
+  def configureBlacklistAndScheduler(confs: (String, String)*): Unit = {
+conf = new SparkConf().setAppName("test").setMaster("local")
+  .set(config.BLACKLIST_ENABLED.key, "true")
+confs.foreach { case (k, v) => conf.set(k, v) }
+
+clock.setTime(0)
+listenerBusMock = mock[LiveListenerBus]
+blacklist = new BlacklistTracker(listenerBusMock, conf, clock)
+  }
+
+  test("Blacklisting individual tasks and checking for 
SparkListenerEvents") {
+configureBlacklistAndScheduler()
--- End diff --

I think the rest of the test code here has changed a lot since you first 
started working on this, but I'd avoid putting in a whole test case just for 
this.  Instead, I'd look at the existing test cases, find all occurrences of 
`blacklist.isNodeBlacklisted` and `blacklist.isExecutorBlacklisted`, and add 
appropriate `verify(listenerBusMock)` calls around all of those.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16346: [SPARK-16654][CORE] Add UI coverage for Applicati...

2017-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16346#discussion_r95724900
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala ---
@@ -88,6 +88,86 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
 new TaskSetBlacklist(conf, stageId, clock)
   }
 
+  def configureBlacklistAndScheduler(confs: (String, String)*): Unit = {
--- End diff --

since you're never passing in `confs`, couldn't you get rid of this method, 
and just change the initialization in `beforeEach` to

```
listenerBusMock = mock[LiveListenerBus]
blacklist = new BlacklistTracker(listenerBusMock, conf, clock)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16346: [SPARK-16654][CORE] Add UI coverage for Applicati...

2017-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16346#discussion_r95724311
  
--- Diff: core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala 
---
@@ -157,4 +158,42 @@ class ExecutorsListener(storageStatusListener: 
StorageStatusListener, conf: Spar
 }
   }
 
+  private def updateExecutorBlacklist(eid: String, isBlacklisted: 
Boolean): Unit = {
+val execTaskSummary = executorToTaskSummary.getOrElseUpdate(eid, 
ExecutorTaskSummary(eid))
+execTaskSummary.isBlacklisted = isBlacklisted
+  }
+
+  override def onExecutorBlacklisted(executorBlacklisted: 
SparkListenerExecutorBlacklisted)
+  : Unit = synchronized {
--- End diff --

nit: move param to second line, and double-indent.  (and the other added 
methods in this file)

```scala
override def onExecutorBlacklisted(
executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = 
synchronized {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16346: [SPARK-16654][CORE] Add UI coverage for Application Leve...

2017-01-11 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16346
  
cc @ajbozarth as you were interested in this earlier


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16346: [SPARK-16654][CORE] Add UI coverage for Application Leve...

2017-01-12 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16346
  
@ajbozarth whoops, I hadn't noticed the extra "Blacklisted" column in the 
summary table at the top -- I was thinking we'd add another row for blacklisted 
executors.  But I actually think the current version is better.  So ignore that 
comment :) (mentioned the same to @jsoltren directly)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r96080614
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,43 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer) extends  Logging {
--- End diff --

ah, that last one is doing the full refactor and fixing the 100+ compile 
errors :)

Here is yet another version, I think more like what Kay was suggesting: 
https://github.com/squito/spark/commit/389fec55de99e9f12104a8ba2b7aa54278aadd93

I agree with Kay that the `serializedTask` member var is a little confusing 
in the version in this pr now.  No strong preference between the alternatives; 
the full separation is cleanest, but the change is pretty big (from a quick 
glance, I'm not sure all those test changes are correct, but I didn't look 
closely).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r96082369
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 ---
@@ -17,11 +17,37 @@
 
 package org.apache.spark.scheduler
 
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkException, SparkFunSuite}
+import java.io.{IOException, NotSerializableException, ObjectInputStream, 
ObjectOutputStream}
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
 import org.apache.spark.util.{RpcUtils, SerializableBuffer}
 
-class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext {
+class NotSerializablePartitionRDD(
+  sc: SparkContext,
+  numPartitions: Int) extends RDD[(Int, Int)](sc, Nil) with Serializable {
--- End diff --

nit: double indent params


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r96082959
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 // Launch tasks returned by a set of resource offers
 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
   for (task <- tasks.flatten) {
-val serializedTask = TaskDescription.encode(task)
-if (serializedTask.limit >= maxRpcMessageSize) {
-  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { 
taskSetMgr =>
-try {
-  var msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
-"spark.rpc.message.maxSize (%d bytes). Consider increasing 
" +
-"spark.rpc.message.maxSize or using broadcast variables 
for large values."
-  msg = msg.format(task.taskId, task.index, 
serializedTask.limit, maxRpcMessageSize)
-  taskSetMgr.abort(msg)
-} catch {
-  case e: Exception => logError("Exception in error callback", 
e)
-}
-  }
+val serializedTask = try {
+  TaskDescription.encode(task)
+} catch {
+  case NonFatal(e) =>
+abortTaskSetManager(scheduler, task.taskId,
+  s"Failed to serialize task ${task.taskId}, not attempting to 
retry it.", Some(e))
+null
 }
-else {
+
+if (serializedTask != null && serializedTask.limit >= 
maxRpcMessageSize) {
+  val msg = "Serialized task %s:%d was %d bytes, which exceeds max 
allowed: " +
+"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
+"spark.rpc.message.maxSize or using broadcast variables for 
large values."
+  abortTaskSetManager(scheduler, task.taskId,
+msg.format(task.taskId, task.index, serializedTask.limit, 
maxRpcMessageSize))
+} else if (serializedTask != null) {
+  if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 
1024) {
--- End diff --

this nested if can be combined into the `else if`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r96083494
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 // Launch tasks returned by a set of resource offers
 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
   for (task <- tasks.flatten) {
-val serializedTask = TaskDescription.encode(task)
-if (serializedTask.limit >= maxRpcMessageSize) {
-  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { 
taskSetMgr =>
-try {
-  var msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
-"spark.rpc.message.maxSize (%d bytes). Consider increasing 
" +
-"spark.rpc.message.maxSize or using broadcast variables 
for large values."
-  msg = msg.format(task.taskId, task.index, 
serializedTask.limit, maxRpcMessageSize)
-  taskSetMgr.abort(msg)
-} catch {
-  case e: Exception => logError("Exception in error callback", 
e)
-}
-  }
+val serializedTask = try {
+  TaskDescription.encode(task)
+} catch {
+  case NonFatal(e) =>
+abortTaskSetManager(scheduler, task.taskId,
--- End diff --

hmm, what happens here is *one* of the tasks can't be serialized (or is too 
big etc.).  We'll abort the taskset, but wont' we still send out the 
`LaunchTask` events for all the other tasks?  That wouldn't happen before.

This may actually be a big problem -- we might lose the performance 
benefits if you first have to create all the serialized tasks to make sure they 
all work, and then send all the msgs.

Maybe its ok to still have previous tasks start, but seems like we should 
at least prevent any *more* tasks from starting, or continuing to try to 
serialize every other task.  I just modified the test case in 
`CoarseGrainedSchedulerBackendSuite` so it waits for multiple executors to come 
up before submitting the bad job, and the logs do show 2 instances of

```
17/01/13 16:12:03.213 dispatcher-event-loop-3 ERROR TaskDescription: Failed 
to serialize task 2, not attempting to retry it.
java.io.NotSerializableException
at 
org.apache.spark.scheduler.NotSerializablePartitionRDD$$anonfun$getPartitions$1$$anon$1.writeObject(CoarseGrainedSchedulerBackendSuite.scala:38)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r96082685
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -517,6 +518,32 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 assertDataStructuresEmpty()
   }
 
+  test("unserializable partition") {
+val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new Partitioner {
+  override def numPartitions = 1
+
+  override def getPartition(key: Any) = 1
+
+  @throws(classOf[IOException])
+  private def writeObject(out: ObjectOutputStream): Unit = {
+throw new NotSerializableException()
+  }
+
+  @throws(classOf[IOException])
+  private def readObject(in: ObjectInputStream): Unit = {}
+})
+
+// Submit a map stage by itself
+submitMapStage(shuffleDep)
+assert(failure.getMessage.startsWith(
+  "Job aborted due to stage failure: Task not serializable"))
+sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+assert(sparkListener.failedStages.contains(0))
+assert(sparkListener.failedStages.size === 1)
+assertDataStructuresEmpty()
--- End diff --

I don't understand, what do you mean by "see above"?  if you are referring 
to the "unserializable task" test -- good point, that test should be fixed too 
:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r96082178
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 ---
@@ -38,4 +64,17 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
 assert(smaller.size === 4)
   }
 
+  test("Scheduler aborts stages that have unserializable partition") {
+val conf = new SparkConf()
+  .setMaster("local-cluster[2, 1, 1024]")
+  .setAppName("test")
+  .set("spark.dynamicAllocation.testing", "true")
+sc = new SparkContext(conf)
+val myRDD = new NotSerializablePartitionRDD(sc, 2)
+val e = intercept[SparkException] {
+  myRDD.count()
+}
+assert(e.getMessage.contains("Failed to serialize task"))
+
--- End diff --

you should add a check here that after this, you can still submit OK tasks. 
eg. just
`sc.parallelize(1 to 10).count()`  (similar to kay's comment about 
dagschedulersuite).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r96082398
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -602,6 +616,20 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 Future.successful(false)
 }
 
-private[spark] object CoarseGrainedSchedulerBackend {
+private[spark] object CoarseGrainedSchedulerBackend extends Logging {
   val ENDPOINT_NAME = "CoarseGrainedScheduler"
+  // abort TaskSetManager without exception
+  def abortTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long,
+msg: => String,
+exception: Option[Throwable] = None): Unit = {
--- End diff --

nit: double indent params


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r96105066
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,43 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer) extends  Logging {
--- End diff --

@witgo yes, feel free to pull that change into this pr


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81859662
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[scheduler] object BlacklistTracker extends Logging {
+
+  private val DEFAULT_TIMEOUT = "1h"
+
+  /**
+   * Returns true if the blacklist is enabled, based on checking the 
configuration in the following
+   * order:
+   * 1. Is it specifically enabled or disabled?
+   * 2. Is it enabled via the legacy timeout conf?
+   * 3. Use the default for the spark-master:
+   *   - off for local mode
+   *   - on for distributed modes (including local-cluster)
+   */
+  def isBlacklistEnabled(conf: SparkConf): Boolean = {
+conf.get(config.BLACKLIST_ENABLED) match {
+  case Some(isEnabled) =>
+isEnabled
+  case None =>
+// if they've got a non-zero setting for the legacy conf, always 
enable the blacklist,
+// otherwise, use the default based on the cluster-mode (off for 
local-mode, on otherwise).
+val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key
+conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF) match {
+  case Some(legacyTimeout) =>
+if (legacyTimeout == 0) {
+  logWarning(s"Turning off blacklisting due to legacy 
configuaration:" +
+s" $legacyKey == 0")
+  false
+} else {
+  // mostly this is necessary just for tests, since real users 
that want the blacklist
+  // will get it anyway by default
+  logWarning(s"Turning on blacklisting due to legacy 
configuration:" +
+s" $legacyKey > 0")
+  true
+}
+  case None =>
+// local-cluster is *not* considered local for these purposes, 
we still want the
+// blacklist enabled by default
+!Utils.isLocalMaster(conf)
+}
+}
+  }
+
+  def getBlacklistTimeout(conf: SparkConf): Long = {
+conf.get(config.BLACKLIST_TIMEOUT_CONF).getOrElse {
+  conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF).getOrElse {
+Utils.timeStringAsMs(DEFAULT_TIMEOUT)
+  }
+}
+  }
+
+  /**
+   * Verify that blacklist configurations are consistent; if not, throw an 
exception.  Should only
+   * be called if blacklisting is enabled.
+   *
+   * The configuration for the blacklist is expected to adhere to a few 
invariants.  Default
+   * values follow these rules of course, but users may unwittingly change 
one configuration
+   * without making the corresponding adjustment elsewhere.  This ensures 
we fail-fast when
+   * there are such misconfigurations.
+   */
+  def validateBlacklistConfs(conf: SparkConf): Unit = {
+
+def mustBePos(k: String, v: String): Unit = {
+  throw new IllegalArgumentException(s"$k was $v, but must be > 0.")
+}
+
+// undocumented escape hatch for validation -- just for tests that 
want to run in an "unsafe"
+// configuration.
+if (!conf.get("spark.blacklist.testing.skipValidation", 
"false").toBoolean) {
+
+  Seq(
+config.MAX_TASK_ATTEMPTS_PER_EXECUTOR,
+config.MAX_TASK_ATTEMPTS_PER_NODE,
+config.MAX_FAILURES_PER_EXEC_STAGE,
+config.MAX_FAILED_EXEC_PER_NODE_STAGE
+  ).foreach { config =>
+val v = conf.get(config)
+if (v <= 0) {
+  mustBeP

[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81861857
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala 
---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.HashMap
+
+/**
+ * Small helper for tracking failed tasks for blacklisting purposes.  Info 
on all failures for one
+ * task set, within one task set.
+ */
+class ExecutorFailuresInTaskSet(val node: String) {
--- End diff --

I'd like to push back on this and keep this naming.  I think I actually 
came up with this name from an earlier comment of yours about being confused 
about some variables and asking for more particular names :) (maybe it was for 
other variables).

the naming could just be on instance variables and not on the class itself 
of course, but I think its helpful in this case to have it on the class.  I 
found even i got easily confused about the purpose of various little helper 
classes when I had to come back to this patch after a couple of days.  Where 
its used generally told me more about the important design tradeoffs -- eg., if 
its in a taskset, it'll get dropped when the taskset completes, which 
automatically takes care of memory leaks.  Maybe at some point in the future 
it'll be used for something else, but it seems like then that change go do the 
appropriate renaming.  I feel like the name would also make it clear to the 
later change to check existing assumptions about the scope of this helper.

It doesn't work as a nested class in the larger blacklist patch as I 
currently have it, so since you don't feel strongly I'll just leave at the top 
level (but I will make it `private[scheduler]`, which it should have been 
already).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81864312
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -98,6 +84,14 @@ private[spark] class TaskSetManager(
   var totalResultSize = 0L
   var calculatedTasks = 0
 
+  val taskSetBlacklistOpt: Option[TaskSetBlacklist] = {
--- End diff --

yeah, the "taskSet" is there because this code needs both in the later PR.  
The taskSetManager needs to communicate some info back to the app-level 
blacklist so it needs to work with both.  I could ignore it here and we can 
deal w/ that naming issue when we get there if you want.

I go back and forth on liking the "Opt" suffix ... I'm happy to do either 
way, but I often like opt so that the naming & meaning is clear for code like

```scala
fooOpt.foreach { foo =>
 ...
}
```
    
    eg. 
https://github.com/squito/spark/blob/taskset_blacklist_only/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L594

the naming pattern is used elsewhere in spark:

```
> find . -name "*.scala" | xargs grep "val .*Opt ="
./core/src/main/scala/org/apache/spark/MapOutputTracker.scala:val 
arrayOpt = mapStatuses.get(shuffleId)
./core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:  
private val underlyingMethodOpt = {
./core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala:  
  val jobOpt = statusToJobs.flatMap(_._2).find { jobInfo => jobInfo.jobId == 
jobId}
./core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala:  val 
completionTimeOpt = jobUIData.completionTime
./core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala:val 
metricsOpt = taskUIData.metrics
./examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala: 
   val numIterOpt = options.remove("numIter").map(_.toInt)

./external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala:
  val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
./graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala:
val activeDirectionOpt = activeSetOpt.map(_._2)

./sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala:
  val intOpt = ctx.freshName("intOpt")

./sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala:
  val longOpt = ctx.freshName("longOpt")

./sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
  val newConditionOpt = conditionOpt match {

./streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala:
  private val serializableConfOpt = conf.map(new SerializableConfiguration(_))
./yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala: 
 val hostOpt = allocatedContainerToHostMap.get(containerId)
```

but all that said, I don't feel strongly, just wanted to explain what I was 
thinking, if you still want a change thats totally fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81864626
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -478,16 +473,18 @@ private[spark] class TaskSetManager(
   // a good proxy to task serialization time.
   // val timeTaken = clock.getTime() - startTime
   val taskName = s"task ${info.id} in stage ${taskSet.id}"
-  logInfo(s"Starting $taskName (TID $taskId, $host, partition 
${task.partitionId}," +
-s" $taskLocality, ${serializedTask.limit} bytes)")
+  logInfo(s"Starting $taskName (TID $taskId, $host, executor 
${info.executorId}, " +
+s"partition ${task.partitionId}, $taskLocality, 
${serializedTask.limit} bytes)")
 
   sched.dagScheduler.taskStarted(task, info)
-  return Some(new TaskDescription(taskId = taskId, attemptNumber = 
attemptNum, execId,
+  Some(new TaskDescription(taskId = taskId, attemptNumber = 
attemptNum, execId,
--- End diff --

oh of course, thanks :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81865871
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -766,9 +782,11 @@ private[spark] class TaskSetManager(
 logWarning(failureReason)
 None
 }
-// always add to failed executors
-failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
-  put(info.executorId, clock.getTimeMillis())
+
+if (reason.countTowardsTaskFailures) {
--- End diff --

`failedExecutors` was the variable for the old blacklisting mechanism, its 
completely gone now.  had nothing to do with executors that have failed 
completely, despite the name:

```scala
// key is taskId (aka TaskInfo.index), value is a Map of executor id to 
when it failed
private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81867147
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -766,9 +782,11 @@ private[spark] class TaskSetManager(
 logWarning(failureReason)
 None
 }
-// always add to failed executors
-failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
-  put(info.executorId, clock.getTimeMillis())
+
+if (reason.countTowardsTaskFailures) {
--- End diff --

the question about moving inside the if on 801 is a good one.  I guess I 
left the logic here to be more like it was before.  Honestly I find `isZombie` 
to be super-confusing, I always have to think hard about exactly what it means. 
 I guess your point is, if the taskset is a zombie, we're not going to schedule 
anymore tasks from it, so no point in tracking more failures?  And furthermore, 
you can't "un-zombie" a taskset?

The only case I can think of the logic actually being different is if you 
have speculative execution, and you get task failures after the taskset 
completes (not from taskcommitdenied, but real task failures).  It doesn't 
matter at all for this PR, but it might matter in the context of app-level 
blacklisting, if you want to learn about bad executors.  eg., maybe an executor 
first has stragglers, and then eventually fails things.  The current code I 
have for app-level blacklisting wouldn't take advantage of this anyway, since 
it just gets blacklisting info from a taskset when it succeeds and thats it.

anyway, for now I will go ahead and put it inside that block, but just 
wanted to share my thoughts since its a little subtle and wanted to talk it 
through.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81867460
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala 
---
@@ -51,37 +54,67 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
 assertDataStructuresEmpty(noFailure = false)
   }
 
-  // even with the blacklist turned on, if maxTaskFailures is not more 
than the number
-  // of executors on the bad node, then locality preferences will lead to 
us cycling through
-  // the executors on the bad node, and still failing the job
+  // even with the blacklist turned on, bad configs can lead to job 
failure.  To survive one
+  // bad node, you need to make sure that
+  // maxTaskFailures > min(spark.blacklist.task.maxTaskAttemptsPerNode, 
nExecutorsPerHost)
   testScheduler(
 "With blacklist on, job will still fail if there are too many bad 
executors on bad host",
 extraConfs = Seq(
-  // set this to something much longer than the test duration so that 
executors don't get
-  // removed from the blacklist during the test
-  ("spark.scheduler.executorTaskBlacklistTime", "1000")
+  config.BLACKLIST_ENABLED.key -> "true",
+  config.MAX_TASK_ATTEMPTS_PER_NODE.key -> "5",
+  config.MAX_TASK_FAILURES.key -> "4",
+  "spark.testing.nHosts" -> "2",
+  "spark.testing.nExecutorsPerHost" -> "5",
+  "spark.testing.nCoresPerExecutor" -> "10",
+  // Blacklisting will normally immediately complain that this config 
is invalid -- the point
+  // of this test is to expose that the configuration is unsafe, so 
skip the validation.
+  "spark.blacklist.testing.skipValidation" -> "true"
--- End diff --

there are separate checks for the validation (`BlacklistTrackerSuite` 
"check blacklist configuration invariants") so I'll just delete this test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81868412
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config
+
+class BlacklistTrackerSuite extends SparkFunSuite {
+
+  test("blacklist still respects legacy configs") {
+val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key
+
+{
+  val localConf = new SparkConf().setMaster("local")
+  assert(!BlacklistTracker.isBlacklistEnabled(localConf))
+  localConf.set(legacyKey, "5000")
+  assert(BlacklistTracker.isBlacklistEnabled(localConf))
+  assert(5000 === BlacklistTracker.getBlacklistTimeout(localConf))
+
+  localConf.set(legacyKey, "0")
+  assert(!BlacklistTracker.isBlacklistEnabled(localConf))
+}
+
+{
--- End diff --

good point, but I'll also turn blacklisting off by default so this 
paritcular test will just reorganized a little.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81893799
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -809,32 +816,65 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
   test("Kill other task attempts when one attempt belonging to the same 
task succeeds") {
 sc = new SparkContext("local", "test")
 sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
-val taskSet = FakeTask.createTaskSet(4)
+val taskSet = FakeTask.createTaskSet(5)
 // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
 sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation.quantile", "0.6")
 val clock = new ManualClock()
 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock)
 val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
   task.metrics.internalAccums
 }
 // Offer resources for 4 tasks to start
+val tasks = new ArrayBuffer[TaskDescription]()
 for ((k, v) <- List(
 "exec1" -> "host1",
 "exec1" -> "host1",
+"exec1" -> "host1",
 "exec2" -> "host2",
 "exec2" -> "host2")) {
   val taskOption = manager.resourceOffer(k, v, NO_PREF)
   assert(taskOption.isDefined)
   val task = taskOption.get
   assert(task.executorId === k)
+  tasks += task
 }
-assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
-// Complete the 3 tasks and leave 1 task in running
+assert(sched.startedTasks.toSet === (0 until 5).toSet)
+// Complete 3 tasks and leave 2 task in running
 for (id <- Set(0, 1, 2)) {
   manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
   assert(sched.endedTasks(id) === Success)
 }
 
+def runningTaskForIndex(index: Int): TaskDescription = {
+  val t = tasks.find { task => task.index == index && 
!sched.endedTasks.contains(task.taskId) }
+  t match {
+case Some(x) => x
+case None =>
+  throw new RuntimeException(s"couldn't find index $index in " +
+s"tasks: ${tasks.map{t => t.index -> t.taskId}} with 
endedTasks:" +
+s" ${sched.endedTasks.keys}")
+  }
+}
+
+// have each of the running tasks fail 3 times (not enough to abort 
the stage)
+(3 until 6).foreach { attempt =>
--- End diff --

it didn't *need* to, in the sense that the test would pass without these 
changes.  I was concerned about my concern to `TaskKilled.countTowardsFailure` 
-- in particular, I wanted to make sure that when you have speculative tasks, 
cancelling speculative tasks doesn't lead to aborting the taskset.  This wasn't 
tested at all before, so I expanded this test to cover that.  So the additions 
here are to make sure there are 4 task "failures", but one of them is just a 
speculative task being canceled.   The other changes to create 2 speculative 
tasks are also just for better coverage -- I wanted the first successful 
speculative task to *not* complete the taskset.  Unfortunately that made some 
of the bookkeeping more complicated.

In hindsight, its probably pointless to throw it all into the same test.  
I'll break it out into another one.  It'll need to do virtually everything of 
the current test, so it will be redundant from a coverage perspective.  But 
hopefully the intent of the tests will be more clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81895140
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.{HashMap, HashSet}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+
+/**
+ * Handles blacklisting executors and nodes within a taskset.  This 
includes blacklisting specific
+ * (task, executor) / (task, nodes) pairs, and also completely 
blacklisting executors and nodes
+ * for the entire taskset.
+ *
+ * THREADING:  As a helper to [[TaskSetManager]], this class is designed 
to only be called from code
+ * with a lock on the TaskScheduler (e.g. its event handlers). It should 
not be called from other
+ * threads.
+ */
+private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val 
stageId: Int, val clock: Clock)
+extends Logging {
+
+  private val MAX_TASK_ATTEMPTS_PER_EXECUTOR = 
conf.get(config.MAX_TASK_ATTEMPTS_PER_EXECUTOR)
+  private val MAX_TASK_ATTEMPTS_PER_NODE = 
conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)
+  private val MAX_FAILURES_PER_EXEC_STAGE = 
conf.get(config.MAX_FAILURES_PER_EXEC_STAGE)
+  private val MAX_FAILED_EXEC_PER_NODE_STAGE = 
conf.get(config.MAX_FAILED_EXEC_PER_NODE_STAGE)
+  private val TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf)
+
+  /**
+   * A map from each executor to the task failures on that executor.
+   */
+  val execToFailures: HashMap[String, ExecutorFailuresInTaskSet] = new 
HashMap()
+
+  /**
+   * Map from node to all executors on it with failures.  Needed because 
we want to know about
+   * executors on a node even after they have died.
+   */
+  private val nodeToExecsWithFailures: HashMap[String, HashSet[String]] = 
new HashMap()
+  private val nodeToBlacklistedTasks: HashMap[String, HashSet[Int]] = new 
HashMap()
+  private val blacklistedExecs: HashSet[String] = new HashSet()
+  private val blacklistedNodes: HashSet[String] = new HashSet()
+
+  /**
+   * Return true if this executor is blacklisted for the given task.  This 
does *not*
+   * need to return true if the executor is blacklisted for the entire 
stage.
+   * That is to keep this method as fast as possible in the inner-loop of 
the
+   * scheduler, where those filters will have already been applied.
+   */
+  def isExecutorBlacklistedForTask(
+  executorId: String,
+  index: Int): Boolean = {
+execToFailures.get(executorId)
+  .map { execFailures =>
+val count = 
execFailures.taskToFailureCountAndExpiryTime.get(index).map(_._1).getOrElse(0)
+count >= MAX_TASK_ATTEMPTS_PER_EXECUTOR
+  }
+  .getOrElse(false)
+  }
+
+  def isNodeBlacklistedForTask(
+  node: String,
+  index: Int): Boolean = {
+nodeToBlacklistedTasks.get(node)
+  .map(_.contains(index))
+  .getOrElse(false)
+  }
+
+  /**
+   * Return true if this executor is blacklisted for the given stage.  
Completely ignores whether
+   * anything to do with the node the executor is on.  That
+   * is to keep this method as fast as possible in the inner-loop of the 
scheduler, where those
+   * filters will already have been applied.
+   */
+  def isExecutorBlacklistedForTaskSet(executorId: String): Boolean = {
+blacklistedExecs.contains(executorId)
+  }
+
+  def isNodeBlacklistedForTaskSet(node: String): Boolean = {
+blacklistedNodes.contains(node)
+  }
--- End diff --

I know its verbose but I'd prefer to keep it.  Especially once 
application-level blacklisting is added 
(https://github.com/apache/spark/pull/1407

[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81896656
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[scheduler] object BlacklistTracker extends Logging {
+
+  private val DEFAULT_TIMEOUT = "1h"
+
+  /**
+   * Returns true if the blacklist is enabled, based on checking the 
configuration in the following
+   * order:
+   * 1. Is it specifically enabled or disabled?
+   * 2. Is it enabled via the legacy timeout conf?
+   * 3. Use the default for the spark-master:
+   *   - off for local mode
+   *   - on for distributed modes (including local-cluster)
+   */
+  def isBlacklistEnabled(conf: SparkConf): Boolean = {
+conf.get(config.BLACKLIST_ENABLED) match {
+  case Some(isEnabled) =>
+isEnabled
+  case None =>
+// if they've got a non-zero setting for the legacy conf, always 
enable the blacklist,
+// otherwise, use the default based on the cluster-mode (off for 
local-mode, on otherwise).
+val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key
+conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF) match {
+  case Some(legacyTimeout) =>
+if (legacyTimeout == 0) {
+  logWarning(s"Turning off blacklisting due to legacy 
configuaration:" +
+s" $legacyKey == 0")
+  false
+} else {
+  // mostly this is necessary just for tests, since real users 
that want the blacklist
+  // will get it anyway by default
+  logWarning(s"Turning on blacklisting due to legacy 
configuration:" +
+s" $legacyKey > 0")
+  true
+}
+  case None =>
+// local-cluster is *not* considered local for these purposes, 
we still want the
+// blacklist enabled by default
+!Utils.isLocalMaster(conf)
+}
+}
+  }
+
+  def getBlacklistTimeout(conf: SparkConf): Long = {
+conf.get(config.BLACKLIST_TIMEOUT_CONF).getOrElse {
+  conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF).getOrElse {
+Utils.timeStringAsMs(DEFAULT_TIMEOUT)
+  }
+}
+  }
+
+  /**
+   * Verify that blacklist configurations are consistent; if not, throw an 
exception.  Should only
+   * be called if blacklisting is enabled.
+   *
+   * The configuration for the blacklist is expected to adhere to a few 
invariants.  Default
+   * values follow these rules of course, but users may unwittingly change 
one configuration
+   * without making the corresponding adjustment elsewhere.  This ensures 
we fail-fast when
+   * there are such misconfigurations.
+   */
+  def validateBlacklistConfs(conf: SparkConf): Unit = {
+
+def mustBePos(k: String, v: String): Unit = {
+  throw new IllegalArgumentException(s"$k was $v, but must be > 0.")
+}
+
+// undocumented escape hatch for validation -- just for tests that 
want to run in an "unsafe"
+// configuration.
+if (!conf.get("spark.blacklist.testing.skipValidation", 
"false").toBoolean) {
+
+  Seq(
+config.MAX_TASK_ATTEMPTS_PER_EXECUTOR,
+config.MAX_TASK_ATTEMPTS_PER_NODE,
+config.MAX_FAILURES_PER_EXEC_STAGE,
+config.MAX_FAILED_EXEC_PER_NODE_STAGE
+  ).foreach { config =>
+val v = conf.get(config)
+if (v <= 0) {
+  mustBeP

[GitHub] spark issue #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSets

2016-10-04 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/15249
  
@mridulm on the questions about expiry from blacklists, you are not missing 
anything -- this explictly does not do any timeouts at the taskset level (this 
is mentioned in the design doc).  The timeout code you see is mostly just 
incremental stuff as a step towards https://github.com/apache/spark/pull/14079, 
but doesn't actually add any value here.

The primary motivation for blacklisting that I've seen is actually quite 
different from the use case you are describing -- its not to help deal w/ 
resource contention, but to deal w/ truly broken resources (a bad disk in all 
the cases I can think of).  In fact, in these cases, 1 hour is really short -- 
users really want something more like 6-12 hours probably.  But 1 hr really 
isn't so bad, it just means that the bad resources need to be "rediscovered" 
that often, with a scheduling hiccup while that happens.

This is really different from the use case you are describing -- its a form 
of back off to deal w/ resource contention.  I have actually talked to a couple 
of different folks about doing something like this recently and think it would 
be great, though I see problems with this approach, since it allows other tasks 
to still be scheduled on those executors, and also the time isn't relative to 
the task runtime etc.

Nonetheless, an issue here might be that the old option serves some purpose 
which is no longer supported.  Do we need to add it back in?  Just adding the 
logic for the timeouts again is pretty easy, though
(a) I need to figure out the right place to do it so that it doesn't impact 
scheduling performance

and more importantly

(b) I really worry about being able to configure things so that 
blacklisting can actually handle totally broken resources.  Eg., say that you 
set the timeout to 10s.  If your tasks take 1 minute each, then your one bad 
executor might cycle through the leftover tasks, fail them all, pass the 
timeout, and repeat that cycle a few times till you go over 
spark.task.maxFailures.  I don't see a good way to deal w/ while setting a 
sensible a timeout for the entire application.

Two other workarounds:

(2) just enable the timeout per-task when the legacy configuration is used. 
 Leave it undocumented.  We don't change behavior then, but configuration is 
kind of a mess, and it'll be a headache to continue to maintain this

(3) Add a timeout just to *taskset* level blacklisting.  So its a behavior 
change from the existing blacklisting, which has a timeout per *task*.  This 
removes the interaction w/ spark.task.maxFailures that we've always got to 
tiptoe around.  I also think it might satisfy your use case even better.  I 
still don't think its a great solution to the problem, and we need something 
else for handling this sort of backoff better, so I don't feel great about it 
getting shoved into this feature.

I'm thinking (3) is the best but will give it a bit more thought.  Also 
@kayousterhout @tgravescs @markhamstra for opinions as well since this is a 
bigger design point to consider.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81898588
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[scheduler] object BlacklistTracker extends Logging {
+
+  private val DEFAULT_TIMEOUT = "1h"
--- End diff --

(longer top-level comment responding to this)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSets

2016-10-04 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/15249
  
@kayousterhout @mridulm thanks for the feedback.  obviously still need to 
figure out the timeout thing but otherwise think I've addressed things.  will 
do another pass in the morning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81999277
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -592,34 +589,54 @@ private[spark] class TaskSetManager(
* failures (this is because the method picks on unscheduled task, and 
then iterates through each
* executor until it finds one that the task hasn't failed on already).
*/
-  private[scheduler] def abortIfCompletelyBlacklisted(executors: 
Iterable[String]): Unit = {
-
-val pendingTask: Option[Int] = {
-  // usually this will just take the last pending task, but because of 
the lazy removal
-  // from each list, we may need to go deeper in the list.  We poll 
from the end because
-  // failed tasks are put back at the end of allPendingTasks, so we're 
more likely to find
-  // an unschedulable task this way.
-  val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet =>
-copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)
-  }
-  if (indexOffset == -1) {
-None
-  } else {
-Some(allPendingTasks(indexOffset))
-  }
-}
+  private[scheduler] def abortIfCompletelyBlacklisted(
--- End diff --

wait, why isn't the "most common scenario" still applicable?  If you have 
two executors, and 4 max failures, then after the task fails twice, you won't 
be able to schedule anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r82007301
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -809,32 +816,65 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
   test("Kill other task attempts when one attempt belonging to the same 
task succeeds") {
 sc = new SparkContext("local", "test")
 sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
-val taskSet = FakeTask.createTaskSet(4)
+val taskSet = FakeTask.createTaskSet(5)
 // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
 sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation.quantile", "0.6")
 val clock = new ManualClock()
 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock)
 val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
   task.metrics.internalAccums
 }
 // Offer resources for 4 tasks to start
+val tasks = new ArrayBuffer[TaskDescription]()
 for ((k, v) <- List(
 "exec1" -> "host1",
 "exec1" -> "host1",
+"exec1" -> "host1",
 "exec2" -> "host2",
 "exec2" -> "host2")) {
   val taskOption = manager.resourceOffer(k, v, NO_PREF)
   assert(taskOption.isDefined)
   val task = taskOption.get
   assert(task.executorId === k)
+  tasks += task
 }
-assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
-// Complete the 3 tasks and leave 1 task in running
+assert(sched.startedTasks.toSet === (0 until 5).toSet)
+// Complete 3 tasks and leave 2 task in running
 for (id <- Set(0, 1, 2)) {
   manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
   assert(sched.endedTasks(id) === Success)
 }
 
+def runningTaskForIndex(index: Int): TaskDescription = {
+  val t = tasks.find { task => task.index == index && 
!sched.endedTasks.contains(task.taskId) }
+  t match {
+case Some(x) => x
+case None =>
+  throw new RuntimeException(s"couldn't find index $index in " +
+s"tasks: ${tasks.map{t => t.index -> t.taskId}} with 
endedTasks:" +
+s" ${sched.endedTasks.keys}")
+  }
+}
+
+// have each of the running tasks fail 3 times (not enough to abort 
the stage)
+(3 until 6).foreach { attempt =>
--- End diff --

after sleeping on this, I actually think its worth updating the original 
test.  It used to only check that the task was killed when there was a 
lingering task for the *final* task.  That's a rather special case -- it seems 
like its worth adding a check for the other tasks as well.  That adds a most of 
the complexity here (with two speculative tasks, you need the extra 
book-keeping to figure out what your'e dealing with.)

I could put it back in one test, and change the test name to include both 
aspects being tested, maybe "Kill other task attempts when one attempt 
belonging to the same task succeeds, and ensure killed tasks do not count 
towards failing task set".

Or I could make two different tests with virtually the same setup (with 
shared code) and just different asserts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSets

2016-10-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/15249
  
I forgot to add that I had turned off blacklisting by default, I agree with 
your suggestion Kay.  I pushed another commit which updated the docs as well.  
There are some other small style things and a couple of added comments etc. in 
there too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...

2016-10-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15335#discussion_r82021050
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1255,27 +1255,46 @@ class DAGScheduler(
   s"longer running")
   }
 
-  if (disallowStageRetryForTest) {
-abortStage(failedStage, "Fetch failure will not retry stage 
due to testing config",
-  None)
-  } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
-abortStage(failedStage, s"$failedStage (${failedStage.name}) " 
+
-  s"has failed the maximum allowable number of " +
-  s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
-  s"Most recent failure reason: ${failureMessage}", None)
-  } else {
-if (failedStages.isEmpty) {
-  // Don't schedule an event to resubmit failed stages if 
failed isn't empty, because
-  // in that case the event will already have been scheduled.
-  // TODO: Cancel running tasks in the stage
-  logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-s"$failedStage (${failedStage.name}) due to fetch failure")
-  messageScheduler.schedule(new Runnable {
-override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+  val shouldAbortStage =
+failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+disallowStageRetryForTest
+
+  if (shouldAbortStage) {
+val abortMessage = if (disallowStageRetryForTest) {
+  "Fetch failure will not retry stage due to testing config"
+} else {
+  s"""$failedStage (${failedStage.name})
+ |has failed the maximum allowable number of
+ |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
 }
+abortStage(failedStage, abortMessage, None)
+  } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
+// TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
+val noResubmitEnqueued = !failedStages.contains(failedStage)
 failedStages += failedStage
 failedStages += mapStage
+if (noResubmitEnqueued) {
+  // We expect one executor failure to trigger many 
FetchFailures in rapid succession,
+  // but all of those task failures can typically be handled 
by a single resubmission of
+  // the failed stage.  We avoid flooding the scheduler's 
event queue with resubmit
+  // messages by checking whether a resubmit is already in the 
event queue for the
+  // failed stage.  If there is already a resubmit enqueued 
for a different failed
+  // stage, that event would also be sufficient to handle the 
current failed stage, but
+  // producing a resubmit for each failed stage makes 
debugging and logging a little
+  // simpler while not producing an overwhelming number of 
scheduler events.
+  logInfo(
+s"Resubmitting $mapStage (${mapStage.name}) and " +
+s"$failedStage (${failedStage.name}) due to fetch failure"
+  )
+  messageScheduler.schedule(
--- End diff --

I find myself frequently wondering about the purpose of this.  Its 
commented very tersely on RESUBMIT_TIMEOUT, but I think it might be nice to add 
a longer comment here.  I guess something like

"If we get one fetch-failure, we often get more fetch failures across 
multiple executors.  We will get better parallelism when we resubmit the 
mapStage if we can resubmit when we know about as many of those failures as 
possible.  So this is a heuristic to add a *small* delay to see if we gather a 
few more failures before we resubmit."

We do *not* need the delay to figure out exactly which shuffle-map outputs 
are gone on the executor -- we always [mark the executor as lost on a fetch 
failure](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1288),
 which means we mark all its map output as gone.  (This is really confusing -- 
it *looks* like we only [remove the one shuffle

[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...

2016-10-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/15335#discussion_r82010161
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1255,27 +1255,46 @@ class DAGScheduler(
   s"longer running")
   }
 
-  if (disallowStageRetryForTest) {
-abortStage(failedStage, "Fetch failure will not retry stage 
due to testing config",
-  None)
-  } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
-abortStage(failedStage, s"$failedStage (${failedStage.name}) " 
+
-  s"has failed the maximum allowable number of " +
-  s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
-  s"Most recent failure reason: ${failureMessage}", None)
-  } else {
-if (failedStages.isEmpty) {
-  // Don't schedule an event to resubmit failed stages if 
failed isn't empty, because
-  // in that case the event will already have been scheduled.
-  // TODO: Cancel running tasks in the stage
-  logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-s"$failedStage (${failedStage.name}) due to fetch failure")
-  messageScheduler.schedule(new Runnable {
-override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+  val shouldAbortStage =
+failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+disallowStageRetryForTest
+
+  if (shouldAbortStage) {
+val abortMessage = if (disallowStageRetryForTest) {
+  "Fetch failure will not retry stage due to testing config"
+} else {
+  s"""$failedStage (${failedStage.name})
+ |has failed the maximum allowable number of
+ |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
 }
+abortStage(failedStage, abortMessage, None)
+  } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
+// TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
+val noResubmitEnqueued = !failedStages.contains(failedStage)
--- End diff --

name is slightly misleading, since there might be a Resubmit enqueued for 
another stage.  Maybe `noResubmitEnqueuedForThisStage`?  but perhaps just 
longer without adding any clarity -- the comment below explains it pretty well 
so I'm ok w/ the name, just thinking aloud.

Also, I was weondering whether this should be 
`!failedStages.contains(failedStage) || !failedStages.contains(mapStage)`.  Is 
there any scenario where it would contains `failedStage` but not `mapStage`?  I 
couldn't come up with anything, but also wonder if there are enough weird 
scenarios we easily overlook that it might be better to keep it in just in case.

We could also try to send even fewer Resubmit events -- if the stage is 
already in `waitingStages`, we don't need to resubmit.  But I think I'd prefer 
to not go that far, since its always safe to over-Resubmit, and worried we may 
overlook a case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >