[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165764166
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
+
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
+  }
+}
+
+val submitMissingTasksRunnable = new Runnable {
+  override def run() = {
+// Simply simulate the process of submitMissingTasks.
+// Wait until doCheckpoint job running finished, but checkpoint 
status not changed.
+semaphore1.acquire()
+
+val ser = SparkEnv.get.closureSerializer.newInstance()
+
+// Simply simulate task serialization while submitMissingTasks.
+// Task serialized with rdd checkpoint not finished.
+val cleanedFunc = sc.clean(Utils.getIteratorSize _)
+val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
+val taskBinaryBytes = JavaUtils.bufferToArray(
+  ser.serialize((rdd, func): AnyRef))
+// Because partition calculate is in a synchronized block, so in 
the fixed code
+// partition is calculated here.
+val correctPart = rdd.partitions(0)
+
+// Release semaphore2 so changing checkpoint status to 
Checkpointed will be done in
+// checkpointThread.
+semaphore2.release()
+// Wait until checkpoint status changed to Checkpointed in 
checkpointThread.
+semaphore1.acquire()
+
+// Part calculated with rdd checkpoint already finished.
+val errPart = rdd.partitions(0)
+
+// TaskBinary will be deserialized when run task in executor.
+val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Unit)](
+  ByteBuffer.wrap(taskBinaryBytes), 
Thread.currentThread.getContextClassLoader)
+
+val taskContext = mock(classOf[TaskContext])
+doNothing().when(taskContext).killTaskIfInterrupted()
+
+// ClassCastException is expected with errPart.
--- End diff --

I think this is a bit easier to follow if you say

Make sure our test 

[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165763669
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
+
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
+  }
+}
+
+val submitMissingTasksRunnable = new Runnable {
+  override def run() = {
+// Simply simulate the process of submitMissingTasks.
+// Wait until doCheckpoint job running finished, but checkpoint 
status not changed.
+semaphore1.acquire()
+
+val ser = SparkEnv.get.closureSerializer.newInstance()
+
+// Simply simulate task serialization while submitMissingTasks.
+// Task serialized with rdd checkpoint not finished.
+val cleanedFunc = sc.clean(Utils.getIteratorSize _)
+val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
+val taskBinaryBytes = JavaUtils.bufferToArray(
+  ser.serialize((rdd, func): AnyRef))
+// Because partition calculate is in a synchronized block, so in 
the fixed code
+// partition is calculated here.
+val correctPart = rdd.partitions(0)
+
+// Release semaphore2 so changing checkpoint status to 
Checkpointed will be done in
+// checkpointThread.
+semaphore2.release()
+// Wait until checkpoint status changed to Checkpointed in 
checkpointThread.
+semaphore1.acquire()
+
+// Part calculated with rdd checkpoint already finished.
--- End diff --

I'd add a comment above this:

Now we're done simulating the interleaving that might happen within the 
scheduler -- we'll check to make sure the final state is OK by simulating a 
couple steps that normally happen on the executor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165761754
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
--- End diff --

I'd remove "simply" here and elsewhere in comments.  Also "do" -> "does"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165761207
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
--- End diff --

why do you make a tempfile for the checkpoint dir and then delete it?  why 
not just  `checkpointDir = new File(tempDir, "checkpointing")`?  Or even just 
`checkpointDir = Utils.createTempDir()`?

(CheckpointSuite does this so it can call `sc.setCheckpointDir`, but you're 
not doing that here)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165763018
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
--- End diff --

this would be a bit easier to follow if you rename your semaphores a bit.

`semaphore1` -> `doCheckpointStarted`
`semaphore2` -> `taskBinaryBytesFinished`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165764342
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
+
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
+  }
+}
+
+val submitMissingTasksRunnable = new Runnable {
+  override def run() = {
+// Simply simulate the process of submitMissingTasks.
+// Wait until doCheckpoint job running finished, but checkpoint 
status not changed.
+semaphore1.acquire()
+
+val ser = SparkEnv.get.closureSerializer.newInstance()
+
+// Simply simulate task serialization while submitMissingTasks.
+// Task serialized with rdd checkpoint not finished.
+val cleanedFunc = sc.clean(Utils.getIteratorSize _)
+val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
+val taskBinaryBytes = JavaUtils.bufferToArray(
+  ser.serialize((rdd, func): AnyRef))
+// Because partition calculate is in a synchronized block, so in 
the fixed code
+// partition is calculated here.
+val correctPart = rdd.partitions(0)
+
+// Release semaphore2 so changing checkpoint status to 
Checkpointed will be done in
+// checkpointThread.
+semaphore2.release()
+// Wait until checkpoint status changed to Checkpointed in 
checkpointThread.
+semaphore1.acquire()
+
+// Part calculated with rdd checkpoint already finished.
+val errPart = rdd.partitions(0)
+
+// TaskBinary will be deserialized when run task in executor.
+val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Unit)](
+  ByteBuffer.wrap(taskBinaryBytes), 
Thread.currentThread.getContextClassLoader)
+
+val taskContext = mock(classOf[TaskContext])
+doNothing().when(taskContext).killTaskIfInterrupted()
+
+// ClassCastException is expected with errPart.
+intercept[ClassCastException] {
+  // Triggered when runTask in executor.
+  t

[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165763274
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
+
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
--- End diff --

and then this would be another semaphore `checkpointStateUpdated`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165759800
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1016,15 +1016,23 @@ class DAGScheduler(
 // might modify state of objects referenced in their closures. This is 
necessary in Hadoop
 // where the JobConf/Configuration object is not thread-safe.
 var taskBinary: Broadcast[Array[Byte]] = null
+var partitions: Array[Partition] = null
 try {
   // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
   // For ResultTask, serialize and broadcast (rdd, func).
-  val taskBinaryBytes: Array[Byte] = stage match {
-case stage: ShuffleMapStage =>
-  JavaUtils.bufferToArray(
-closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
AnyRef))
-case stage: ResultStage =>
-  JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
+  var taskBinaryBytes: Array[Byte] = null
+  // Add synchronized block to avoid rdd deserialized from 
taskBinaryBytes has diff checkpoint
+  // status with the rdd when create ShuffleMapTask or ResultTask.
--- End diff --

I'd reword this a bit:

taskBinaryBytes and partitions are both effected by the checkpoint status.  
We need this synchronization in case another concurrent job is checkpointing 
this RDD, so we get a consistent view of both variables.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20462: [SPARK-23020][core] Fix another race in the in-process l...

2018-02-02 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20462
  
bq. Is there a release notes / ki kinda thing for Spark releases?

not that I know of -- I was just thinking of putting it in the jira, I 
think that is the best things users have to search.  I know its not great, but 
its something.  The current bug description doesn't hint at this at all.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20462: [SPARK-23020][core] Fix another race in the in-process l...

2018-02-02 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20462
  
lgtm

The fix here makes sense to me, I see how it breaks the test.  I'm just 
wondering, do we need to doc this at all for users, eg. just clearly describe 
it in jira?  I realize most users will never hit this, as its only super short 
apps, but just say that its possible for very short apps, they never enter the 
FINISHED state but instead go to LOST, even though the app finished 
successfully?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20422: [SPARK-23253][Core][Shuffle]Only write shuffle temporary...

2018-02-02 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20422
  
merged to master.

thanks @yaooqinn for doing and updating the tests too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data

2018-02-02 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19041
  
Thought some more about the race between `RemoveBlock` getting sent back 
from the executor vs when the `CacheRecoveryManager` tries to replicate the 
next block -- actually why is there the back-and-forth with the driver for 
every block?  Why isn't there just one message from the `CacheRecoveryManager` 
to the executor, saying "Drain all RDD blocks" and then one message from the 
executor back to the driver when its done?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r165482549
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -420,63 +432,53 @@ private[spark] class ExecutorAllocationManager(
* Request the cluster manager to remove the given executors.
* Returns the list of executors which are removed.
*/
-  private def removeExecutors(executors: Seq[String]): Seq[String] = 
synchronized {
-val executorIdsToBeRemoved = new ArrayBuffer[String]
-
+  private def removeExecutors(executors: Seq[String]): Unit = synchronized 
{
--- End diff --

return in the doc is wrong


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r165488466
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def recoverLatestRDDBlock(
+  execId: String,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"Replicating first cached block on $execId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
+  // we assume blocks from the latest rdd are most relevant
+  firstBlock <- if (blocks.isEmpty) None else 
Some(blocks.max[RDDBlockId](Ordering.by(_.rddId)))
+  replicaSet <- blockLocations.asScala.get(firstBlock)
+  // Add 2 below because you need the number of replicas, plus one for 
the original, plus one
+  // for the new replica.
+  maxReps = replicaSet.size + 2
+} yield info.slaveEndpoint
+  .ask[Boolean](ReplicateBlock(firstBlock, replicaSet.toSeq, excluded, 
maxReps))
+  .flatMap { success =>
+if (success) {
+  logTrace(s"Replicated block $firstBlock on executor $execId")
+  replicaSet -= blockManagerId
+  info.slaveEndpoint.ask[Boolean](RemoveBlock(firstBlock))
--- End diff --

if I understand right, in order for the next iteration to avoid trying to 
remove the same block over again, you need this call to update the 
`info.cachedBlocks` used above.  But I think that update is async -- even after 
this future has completed, it doesn't mean `info.cachedBlocks` has been updated.

I think what will happen in that case is, on the next time through, on the 
next iteration you'll try to remove the exact same block, which will fail on 
the executor because its already been removed, and then back on the driver 
you'll decide to stop trying to replicate the rest of the blocks because of 
this apparent failure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r165490041
  
--- Diff: 
core/src/test/scala/org/apache/spark/CacheRecoveryManagerSuite.scala ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.{Future, Promise}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.reflect.ClassTag
+
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.concurrent.Eventually
+import org.scalatest.mockito.MockitoSugar
+import org.scalatest.time.{Millis, Span}
+
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.rpc._
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+class CacheRecoveryManagerSuite
+  extends SparkFunSuite with MockitoSugar with Matchers with Eventually {
+
+  val oneGB: Long = 1024L * 1024L * 1024L * 1024L
+  val plentyOfMem = Map(
+BlockManagerId("1", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("2", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("3", "host", 12, None) -> ((oneGB, oneGB)))
+
+  test("CacheRecoveryManager will replicate blocks until empty and then 
kill executor") {
+val conf = new SparkConf()
+val eam = mock[ExecutorAllocationManager]
+val blocks = Seq(RDDBlockId(1, 1), RDDBlockId(2, 1))
+val bmme = FakeBMM(1, blocks.iterator, plentyOfMem)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+when(eam.killExecutors(Seq("1"))).thenReturn(Seq("1"))
+val result = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+
+eventually {
+  verify(eam).killExecutors(Seq("1"))
+  bmme.replicated.get("1").get shouldBe 2
+}
+
+cleanup(result, cacheRecoveryManager)
+  }
+
+  test("CacheRecoveryManager will kill executor if it takes too long to 
replicate") {
+val conf = new 
SparkConf().set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "1s")
+val eam = mock[ExecutorAllocationManager]
+val blocks = Set(RDDBlockId(1, 1), RDDBlockId(2, 1), RDDBlockId(3, 1), 
RDDBlockId(4, 1))
+val bmme = FakeBMM(600, blocks.iterator, plentyOfMem)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+val result = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+
+eventually(timeout(Span(1010, Millis)), interval(Span(500, Millis))) {
--- End diff --

Also you can skip the `pause` and instead tell your FakeBMM to just stop 
responding after N blocks -- eg., make it only respond to the first 2 calls to 
`RecoverLatestRDDBlock` and just ignore the rest.  (You're still waiting on the 
kill timer, though)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r165486679
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def recoverLatestRDDBlock(
+  execId: String,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"Replicating first cached block on $execId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
+  // we assume blocks from the latest rdd are most relevant
+  firstBlock <- if (blocks.isEmpty) None else 
Some(blocks.max[RDDBlockId](Ordering.by(_.rddId)))
--- End diff --

you can do `blocks.maxBy(_.rddId)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20474: [SPARK-23235][Core] Add executor Threaddump to api

2018-02-01 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20474
  
Jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20474: [SPARK-23235][Core] Add executor Threaddump to api

2018-02-01 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20474
  
Jenkins, add to whitelist


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20474: [SPARK-23235][Core] Add executor Threaddump to ap...

2018-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20474#discussion_r165379694
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala 
---
@@ -51,6 +52,21 @@ private[v1] class AbstractApplicationResource extends 
BaseAppResource {
   @Path("executors")
   def executorList(): Seq[ExecutorSummary] = 
withUI(_.store.executorList(true))
 
+  @GET
+  @Path("executors/{executorId}/threads")
+  def threadDump(@PathParam("executorId") executorId: String):
+  Option[Array[ThreadStackTrace]] = withUI { ui =>
+val safeExecutorId =
+  Option(UIUtils.stripXSS(executorId)).map { executorId =>
+UIUtils.decodeURLParameter(executorId)
+  }.getOrElse {
+throw new IllegalArgumentException(s"Missing executorId parameter")
+  }
+ui.sc.flatMap { sc =>
+  sc.getExecutorThreadDump(safeExecutorId)
--- End diff --

what happens if you give a bad executor Id?  Looks like you'll just return 
an empty response, but I think an error might be more appropriate?

In fact would be nice if the error distinguished between a totally bogus 
executorId vs. an executorId which is dead vs. calling this on the history 
server, where its totally unavailable.  Here's an example of that kind of error 
handling for the opposite case, where the endpoint is *only* available on the 
history server:


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala?utf8=%E2%9C%93#L90-L95


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20474: [SPARK-23235][Core] Add executor Threaddump to ap...

2018-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20474#discussion_r165377861
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala 
---
@@ -51,6 +52,21 @@ private[v1] class AbstractApplicationResource extends 
BaseAppResource {
   @Path("executors")
   def executorList(): Seq[ExecutorSummary] = 
withUI(_.store.executorList(true))
 
+  @GET
+  @Path("executors/{executorId}/threads")
+  def threadDump(@PathParam("executorId") executorId: String):
+  Option[Array[ThreadStackTrace]] = withUI { ui =>
--- End diff --

if this doesn't fit on oneline, the style is to still put each param on its 
own line

```scala
def threadDump(
@PathParam("executorId") executorId: String): 
Option[Array[ThreadStackTrace]] = ...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20422#discussion_r165256701
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 ---
@@ -89,26 +96,39 @@ class IndexShuffleBlockResolverSuite extends 
SparkFunSuite with BeforeAndAfterEa
 } {
   out2.close()
 }
-resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2)
+resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths2, dataTmp2)
+
+assert(indexFile.length() === (lengths.length + 1) * 8)
 assert(lengths2.toSeq === lengths.toSeq)
 assert(dataFile.exists())
 assert(dataFile.length() === 30)
 assert(!dataTmp2.exists())
 
 // The dataFile should be the previous one
 val firstByte = new Array[Byte](1)
-val in = new FileInputStream(dataFile)
+val dataIn = new FileInputStream(dataFile)
 Utils.tryWithSafeFinally {
-  in.read(firstByte)
+  dataIn.read(firstByte)
 } {
-  in.close()
+  dataIn.close()
 }
 assert(firstByte(0) === 0)
 
+// The index file should not change
+val secondValueOffset = new Array[Byte](8)
+val indexIn = new FileInputStream(indexFile)
+Utils.tryWithSafeFinally {
+  indexIn.read(secondValueOffset)
+  indexIn.read(secondValueOffset)
+} {
+  indexIn.close()
+}
+assert(secondValueOffset(7) === 10, "The index file should not change")
--- End diff --

minor: here and below, would be more clear if you use 
`DataInputStream.readLong()` (no magic 7 offset, and you check the rest of the 
bytes):
```scala
val indexIn = new DataInputStream( newFileInputStream(indexFile))
Utils.tryWithSafeFinally {
  indexIn.readLong()  // first offset is always 0
  assert(10 === indexIn.readLong(),"The index file should not change")
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-01-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r165259010
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def recoverLatestRDDBlock(
+  execId: String,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"Replicating first cached block on $execId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
--- End diff --

in one comment, you mention that you are only replicating in-memory rdd 
blocks.  But this will also replicate on-disk rdd blocks, I think.  do you want 
to replicate both?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20424: [Spark-23240][python] Better error message when e...

2018-01-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20424#discussion_r165255299
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -191,7 +191,20 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 daemon = pb.start()
 
 val in = new DataInputStream(daemon.getInputStream)
-daemonPort = in.readInt()
+try {
+  daemonPort = in.readInt()
+} catch {
+  case exc: EOFException =>
+throw new IOException(s"No port number in $daemonModule's 
stdout")
+}
+
+// test that the returned port number is within a valid range.
+// note: this does not cover the case where the port number
+// is arbitrary data but is also coincidentally within range
+if (daemonPort < 1 || daemonPort > 0x) {
--- End diff --

ah I see, I think you are worried about something other than what bruce and 
I thought.  Your concern is that we might throw an exception for some values 
that are actually perfectly legitimate.  Port 0 being special is a pretty 
standard thing -- its mentioned in the constructor for ServerSocket: 
https://docs.oracle.com/javase/7/docs/api/java/net/ServerSocket.html#ServerSocket%28int%29

which implies that you shouldn't ever open a Socket on port 0, though I 
don't see that officially documented.  At least on my laptop, I get different 
errors if I try to connect to port 0, vs. just connecting to a bogus port:

```scala
scala> val s2 = new Socket("localhost", 1234)
java.net.ConnectException: Connection refused
  at java.net.PlainSocketImpl.socketConnect(Native Method)
  at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
  at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
  at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
  at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
  at java.net.Socket.connect(Socket.java:589)
  at java.net.Socket.connect(Socket.java:538)
  at java.net.Socket.(Socket.java:434)
  at java.net.Socket.(Socket.java:211)
  ... 29 elided

scala> val s3 = new Socket("localhost", 0)
java.net.NoRouteToHostException: Can't assign requested address
  at java.net.PlainSocketImpl.socketConnect(Native Method)
  at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
  at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
  at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
  at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
  at java.net.Socket.connect(Socket.java:589)
  at java.net.Socket.connect(Socket.java:538)
  at java.net.Socket.(Socket.java:434)
  at java.net.Socket.(Socket.java:211)
  ... 29 elided
```

so I think its pretty safe to say that daemon.py (or whatever) shouldn't be 
passing back `0` as the port to bind to.

Still -- it is *clearly* safer to instead have the port written to some 
other file, or (another) socket, so that you we wouldn't have to worry about 
the details of this error handling.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20461: [SPARK-23289][CORE]OneForOneBlockFetcher.DownloadCallbac...

2018-01-31 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20461
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2018-01-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r165214410
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 ---
@@ -126,4 +150,38 @@ private void failRemainingBlocks(String[] 
failedBlockIds, Throwable e) {
   }
 }
   }
+
+  private class DownloadCallback implements StreamCallback {
+
+private WritableByteChannel channel = null;
+private File targetFile = null;
+private int chunkIndex;
+
+public DownloadCallback(File targetFile, int chunkIndex) throws 
IOException {
+  this.targetFile = targetFile;
+  this.channel = Channels.newChannel(new FileOutputStream(targetFile));
+  this.chunkIndex = chunkIndex;
+}
+
+@Override
+public void onData(String streamId, ByteBuffer buf) throws IOException 
{
+  channel.write(buf);
--- End diff --

right, I realize there isn't a simple one-line change here to switch to 
using spliceTo, I was wondering what the behavior is. 

I actually thought zero-copy and offheap were orthogonal -- anytime netty 
gives you direct access to bytes, it has to be copied to user space, right?  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20461: [SPARK-23289][CORE]OneForOneBlockFetcher.Download...

2018-01-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20461#discussion_r165213755
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 ---
@@ -171,7 +171,9 @@ private void failRemainingBlocks(String[] 
failedBlockIds, Throwable e) {
 
 @Override
 public void onData(String streamId, ByteBuffer buf) throws IOException 
{
-  channel.write(buf);
+  while (buf.hasRemaining()) {
+channel.write(buf);
--- End diff --

I actually thought this is OK for a FileChannel (just based on this comment 
in the original: https://github.com/apache/spark/pull/16989/files#r115409001)

but certainly this seems like a safe change.  Just wondering if you 
observed an issue, or this is just to be safe.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-01-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r165207476
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def recoverLatestRDDBlock(
+  execId: String,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"Replicating first cached block on $execId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
+  // we assume blocks from the latest rdd are most relevant
--- End diff --

On first read I thought you were _only_ replicating the latest rdd.  Maybe 
expand this to

As a heuristic, prioritize replicating the latest rdd.  If this succeeds, 
CacheRecoveryManager will try to replicate the remaining rdds.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-01-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r165207798
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def recoverLatestRDDBlock(
+  execId: String,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"Replicating first cached block on $execId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
+  // we assume blocks from the latest rdd are most relevant
+  firstBlock <- if (blocks.isEmpty) None else 
Some(blocks.max[RDDBlockId](Ordering.by(_.rddId)))
+  replicaSet <- blockLocations.asScala.get(firstBlock)
+  // Add 2 below because you need the number of replicas, plus one for 
the original, plus one
+  // for the new replica.
+  maxReps = replicaSet.size + 2
--- End diff --

doesn't blockLocations already include the original?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-01-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r165210078
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Success => Succ}
+import scala.util.Failure
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.{BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executor's cached 
blocks, and then shutting
+ * it down.
+ */
+final private class CacheRecoveryManager(
+blockManagerMasterEndpoint: RpcEndpointRef,
+executorAllocationManager: ExecutorAllocationManager,
+conf: SparkConf)
+  extends Logging {
+
+  private val forceKillAfterS = 
conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT)
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool")
+  private implicit val asyncExecutionContext: ExecutionContext =
+ExecutionContext.fromExecutorService(threadPool)
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers")
+  private val recoveringExecutors: mutable.Set[String] =
+ConcurrentHashMap.newKeySet[String]().asScala
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   */
+  def startCacheRecovery(execIds: Seq[String]): Unit = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+val canBeRecovered = checkMem(execIds)
+recoveringExecutors ++= canBeRecovered
+val executorsWithKillTimers = 
canBeRecovered.zip(canBeRecovered.map(startKillTimer))
+executorsWithKillTimers.foreach((replicateUntilDone _).tupled)
+  }
+
+  /**
+   * Given a list of executors that will be shut down, check if there is 
enough free memory on the
+   * rest of the cluster to hold their data. Return a list of just the 
executors for which there
+   * will be enough space. Executors are included smallest first.
+   *
+   * @param execIds executors which will be shut down
+   * @return a Seq of the executors we do have room for
+   */
+  private def checkMem(execIds: Seq[String]): Seq[String] = {
+val execsToShutDown = execIds.toSet
+// Memory Status is a map of executor Id to a tuple of Max Memory and 
remaining memory on that
+// executor.
+val allExecMemStatus: Map[String, (Long, Long)] = 
blockManagerMasterEndpoint
+  .askSync[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
+  .map { case (blockManagerId, mem) => blockManagerId.executorId -> 
mem }
+
+val (expiringMemStatus, remainingMemStatus) = 
allExecMemStatus.partition {
+  case (execId, _) => execsToShutDown.contains(execId)
+}
+val freeMemOnRemaining = remainingMemStatus.values.map(_._2).sum
+
+// The used mem on each executor sorted from least used mem to greatest
+val executorAndUsedMem: Seq[(String, Long)] =
+  expiringMemStatus.map { case (execId, (maxMem, remainingMem)) =>
+val usedMem = maxMem - remainingMem
+execId -> usedMem
+  }.toSeq.sortBy { case (_, usedMem) => usedMem }
+
+executorAndUsedMem
+  .scan((

[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2018-01-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r165178844
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 ---
@@ -126,4 +150,38 @@ private void failRemainingBlocks(String[] 
failedBlockIds, Throwable e) {
   }
 }
   }
+
+  private class DownloadCallback implements StreamCallback {
+
+private WritableByteChannel channel = null;
+private File targetFile = null;
+private int chunkIndex;
+
+public DownloadCallback(File targetFile, int chunkIndex) throws 
IOException {
+  this.targetFile = targetFile;
+  this.channel = Channels.newChannel(new FileOutputStream(targetFile));
+  this.chunkIndex = chunkIndex;
+}
+
+@Override
+public void onData(String streamId, ByteBuffer buf) throws IOException 
{
+  channel.write(buf);
--- End diff --

I am super-late on reviewing this, apologies, just asking questions for my 
own understanding, and to consider possible future improvements -- this won't 
do a zero-copy transfer, will it?  That ByteBuffer is still in user space?

From my understanding, we'd need to do special handling to use netty's 
`spliceTo` when possible:

https://stackoverflow.com/questions/30322957/is-there-transferfrom-like-functionality-in-netty-for-zero-copy

but I'm still working on putting all the pieces together here and 
admittedly this is out of my area of expertise


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-01-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20422#discussion_r165161260
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 ---
@@ -133,4 +133,65 @@ class IndexShuffleBlockResolverSuite extends 
SparkFunSuite with BeforeAndAfterEa
 }
 assert(firstByte2(0) === 2)
   }
+
+  test("SPARK-23253: index files should be created properly") {
--- End diff --

thanks for adding this, but actually I'm not sure this is covering any 
cases in the previous test, is it?  I was thinking of just adding something to 
read the actual index file, and make sure it had the right values to go with 
the update to the data file (or no updates in some cases).

you may have added a couple more asserts than the original test -- if so, 
maybe they can just be added to the original?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-01-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20422#discussion_r165159188
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 ---
@@ -133,4 +133,65 @@ class IndexShuffleBlockResolverSuite extends 
SparkFunSuite with BeforeAndAfterEa
 }
 assert(firstByte2(0) === 2)
   }
+
+  test("SPARK-23253: index files should be created properly") {
+val shuffleId = 1
+val mapId = 2
+val idxName = s"shuffle_${shuffleId}_${mapId}_0.index"
+val resolver = new IndexShuffleBlockResolver(conf, blockManager)
+
+val lengths = (1 to 2).map(_ => 8L).toArray
--- End diff --

you could do `Array.fill(2)(8L)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20422: [SPARK-23253][Core][Shuffle]Only write shuffle temporary...

2018-01-31 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20422
  
@jerryshao are you ok with making this change?  I think our original 
comments corssed paths as I was taking a closer look


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20424: [Spark-23240][python] Better error message when e...

2018-01-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20424#discussion_r165116866
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -191,7 +191,20 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 daemon = pb.start()
 
 val in = new DataInputStream(daemon.getInputStream)
-daemonPort = in.readInt()
+try {
+  daemonPort = in.readInt()
+} catch {
+  case exc: EOFException =>
+throw new IOException(s"No port number in $daemonModule's 
stdout")
+}
+
+// test that the returned port number is within a valid range.
+// note: this does not cover the case where the port number
+// is arbitrary data but is also coincidentally within range
+if (daemonPort < 1 || daemonPort > 0x) {
--- End diff --

yeah I could go either way on this.  Personally i think just the enhanced 
error message you have above would be useful, without going through the added 
trouble of using another approach.  I'll defer to your opinion @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...

2018-01-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20408#discussion_r164874798
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -423,7 +425,8 @@ private class LiveStage extends LiveEntity {
   newAccumulatorInfos(info.accumulables.values),
   None,
   None,
-  killedSummary)
+  killedSummary,
+  blackListedExecutors)
--- End diff --

oh hmm, I had actually just meant storing this as a variable in 
`LiveStage`, but not including it in `v1.StageData`.  Do you think its useful 
to have this in api itself?  opinions @tgravescs ?  its already present in a 
per-executor flag, but there is not summary for the entire stage, to give a 
list of the blacklisted executors.  Just having it in the `LiveStage` is enough 
to get the performance improvement I had mentioned earlier.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...

2018-01-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20408#discussion_r164886701
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -228,9 +230,12 @@ private[spark] class AppStatusListener(
 
 // Implicitly blacklist every available executor for the stage 
associated with this node
 Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach 
{ stage =>
-  liveExecutors.values.filter(_.host == event.hostId).foreach { exec =>
-setStageBlackListStatus(stage, exec.executorId, now)
+  val executorIds = liveExecutors.values.filter(_.host == 
event.hostId).map(_.executorId)
+  executorIds.foreach { executorId =>
+setStageBlackListStatus(stage, executorId, now)
   }
+  stage.blackListedExecutors ++= executorIds
+  maybeUpdate(stage, now)
--- End diff --

if `setStageBlacklistStatus` took a Seq or varags for multiple executorIds, 
you could push the these other updates down into it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...

2018-01-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20408#discussion_r164824439
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -594,12 +606,24 @@ private[spark] class AppStatusListener(
 
   stage.executorSummaries.values.foreach(update(_, now))
   update(stage, now, last = true)
+
+  val executorIdsForStage = stage.executorSummaries.keySet
+  executorIdsForStage.foreach { executorId =>
+liveExecutors.get(executorId).foreach { exec =>
+  removeBlackListedStageFrom(exec, event.stageInfo.stageId, now)
+}
+  }
 }
 
 appSummary = new AppSummary(appSummary.numCompletedJobs, 
appSummary.numCompletedStages + 1)
 kvstore.write(appSummary)
   }
 
+  private def removeBlackListedStageFrom(exec: LiveExecutor, stageId: Int, 
now: Long) = {
+exec.blacklistedInStages -= stageId
+liveUpdate(exec, now)
--- End diff --

hmm actually I just thought of something else.  It looks like you're 
calling `liveUpdate` here for *every* executor when the stage finishes.  Say 
you have 1000 execs, a very quick stage, and no blacklisting, this is an 
expensive update for no actual change.

So you should at least avoid the `liveUpdate` if `exec.blacklistedInStages` 
hasn't changed at all.  But really, I think that `LiveStage` should maintain a 
set of blacklisted executors, so you avoid calling this entirely for execs 
which aren't blacklisted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...

2018-01-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20408#discussion_r164774722
  
--- Diff: 
core/src/main/resources/org/apache/spark/ui/static/executorspage.js ---
@@ -416,8 +422,7 @@ $(document).ready(function () {
 },
 {data: 'hostPort'},
 {data: 'isActive', render: function (data, type, 
row) {
-if (row.isBlacklisted) return "Blacklisted";
-else return formatStatus (data, type);
+return formatStatus (data, type, row);
--- End diff --

ok nevermind -- now I see the old code didn't seem to follow the convention 
in the rest of this file.  Not sure about the style guide for js, but I think 
this might be a bit cleaner if we put each element on its own line, so the 
nesting is cleaner, like it is here:


https://github.com/apache/spark/blob/master/core/src/main/resources/org/apache/spark/ui/static/executorspage.js#L459-L476


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...

2018-01-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20408#discussion_r164766057
  
--- Diff: 
core/src/main/resources/org/apache/spark/ui/static/executorspage.js ---
@@ -416,8 +422,7 @@ $(document).ready(function () {
 },
 {data: 'hostPort'},
 {data: 'isActive', render: function (data, type, 
row) {
-if (row.isBlacklisted) return "Blacklisted";
-else return formatStatus (data, type);
+return formatStatus (data, type, row);
--- End diff --

weird indentation here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20424: [Spark-23240][python] Better error message when e...

2018-01-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20424#discussion_r164765247
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -191,7 +191,20 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 daemon = pb.start()
 
 val in = new DataInputStream(daemon.getInputStream)
-daemonPort = in.readInt()
+try {
+  daemonPort = in.readInt()
+} catch {
+  case exc: EOFException =>
+throw new IOException(s"No port number in $daemonModule's 
stdout")
+}
+
+// test that the returned port number is within a valid range.
+// note: this does not cover the case where the port number
+// is arbitrary data but is also coincidentally within range
+if (daemonPort < 1 || daemonPort > 0x) {
--- End diff --

yeah this is kind of what I was getting at below -- what value are we 
adding with this extra handling, over the original exception?

Another possibility is to change this to not use stdout, but that adds more 
complexity.  You could use sockets, or right the port to some dedicated 
temporary file.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20424: [Spark-23240][python] Better error message when e...

2018-01-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20424#discussion_r164637553
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -191,7 +191,20 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 daemon = pb.start()
 
 val in = new DataInputStream(daemon.getInputStream)
-daemonPort = in.readInt()
+try {
+  daemonPort = in.readInt()
+} catch {
+  case exc: EOFException =>
+throw new IOException(s"No port number in $daemonModule's 
stdout")
+}
+
+// test that the returned port number is within a valid range.
+// note: this does not cover the case where the port number
+// is arbitrary data but is also coincidentally within range
+if (daemonPort < 1 || daemonPort > 0x) {
+  throw new IOException(s"Bad port number in $daemonModule's 
stdout: " +
+f"0x$daemonPort%08x")
--- End diff --

just a thought:

 this error message won't mean much to the typical user.  Would it be 
sensible to tell the user exactly what python command to run themselves to 
figure out the problem?  Eg. "unexpected stdout from 
/foo/bar/some/path/to/python -m /path/to/daemon.py".  That's what would help 
with that sitecustomization.py case.  Or not useful in general?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-01-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20422#discussion_r164635886
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -166,8 +153,20 @@ private[spark] class IndexShuffleBlockResolver(
   if (dataTmp != null && dataTmp.exists()) {
 dataTmp.delete()
   }
-  indexTmp.delete()
 } else {
+  val out = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(indexTmp)))
--- End diff --

move this below the comment "This is the first successul attempt".

I'd also include a comment about why we write to a temporary file, even 
though we're always going to rename (because in case the task dies somehow, 
we'd prefer to not leave a half-written index file in the final location).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20422: [SPARK-23253][Core][Shuffle]Only write shuffle temporary...

2018-01-29 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20422
  
Jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20422: [SPARK-23253][Core][Shuffle]Only write shuffle temporary...

2018-01-29 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20422
  
thanks for taking a look at this @yaooqinn .  To clarify -- there is no bug 
you are trying to fix here, is there?  Its just an optimization?  From a quick 
glance I think the change seems correct ... but also seems like such a minor 
improvement that I'm not sure I see the value in changing this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...

2018-01-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20408#discussion_r164627546
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -254,6 +255,7 @@ private class LiveExecutor(val executorId: String, 
_addTime: Long) extends LiveE
   var totalShuffleRead = 0L
   var totalShuffleWrite = 0L
   var isBlacklisted = false
+  var blacklistedInStages: Set[Int] = TreeSet()
--- End diff --

yeah thats a good reason, sorry I should have thought of that!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20408: [SPARK-23189][Core][Web UI] Reflect stage level blacklis...

2018-01-29 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20408
  
code looks good to me, but lets what @tgravescs @ajbozarth  say.

@ajbozarth it is wordy, but I think `Active (Blacklisted in Stages: [...])` 
is probably the best of the options so far.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...

2018-01-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20408#discussion_r164584678
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -594,12 +606,24 @@ private[spark] class AppStatusListener(
 
   stage.executorSummaries.values.foreach(update(_, now))
   update(stage, now, last = true)
+
+  val executorIdsForStage = stage.executorSummaries.keySet
+  executorIdsForStage.foreach { executorId =>
+liveExecutors.get(executorId).foreach { exec =>
+  removeBlackListedStageFrom(exec, event.stageInfo.stageId, now)
--- End diff --

whoops, my fault!  thanks for explaining


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...

2018-01-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20408#discussion_r164588595
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -254,6 +255,7 @@ private class LiveExecutor(val executorId: String, 
_addTime: Long) extends LiveE
   var totalShuffleRead = 0L
   var totalShuffleWrite = 0L
   var isBlacklisted = false
+  var blacklistedInStages: Set[Int] = TreeSet()
--- End diff --

any particular reason you chose `TreeSet`, and not just 
`scala.collection.immutable.Set` (scala has a default implementation, 
specialized for a small number of items, then goes to a `HashSet`) -- see use 
of `Map` as implementation for `executorLogs`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20161: [SPARK-21525][streaming] Check error code from superviso...

2018-01-29 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20161
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20399: [SPARK-23209][core] Allow credential manager to work whe...

2018-01-29 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20399
  
I am merging this now to master & 2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20399: [SPARK-23209][core] Allow credential manager to w...

2018-01-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20399#discussion_r164223721
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -75,6 +75,17 @@ private[spark] class HadoopDelegationTokenManager(
   .toMap
   }
 
+  private def safeCreateProvider(
+  createFn: => HadoopDelegationTokenProvider): 
Option[HadoopDelegationTokenProvider] = {
+try {
+  Some(createFn)
+} catch {
+  case t: Throwable =>
+logDebug(s"Failed to load built in provider.", t)
--- End diff --

I think debug is right, actually -- we have no idea at this point if the 
user wants these credential providers, and it could be totally fine if they're 
missing eg. if they never want to talk to hive.

(also don't really care that much and don't want to bike-shed on this ...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...

2018-01-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20408#discussion_r164177919
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -594,12 +606,24 @@ private[spark] class AppStatusListener(
 
   stage.executorSummaries.values.foreach(update(_, now))
   update(stage, now, last = true)
+
+  val executorIdsForStage = stage.executorSummaries.keySet
+  executorIdsForStage.foreach { executorId =>
+liveExecutors.get(executorId).foreach { exec =>
+  removeBlackListedStageFrom(exec, event.stageInfo.stageId, now)
--- End diff --

I'm just doing a really quick scan here, but I don't understand why changes 
here are necessary.  You don't get these events for blacklisting within a stage.

or is this a bug in the current code, and something we should fix in the 
pending 2.3 release?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...

2018-01-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20408#discussion_r164175040
  
--- Diff: 
core/src/main/resources/org/apache/spark/ui/static/executorspage.js ---
@@ -25,9 +25,13 @@ function getThreadDumpEnabled() {
 return threadDumpEnabled;
 }
 
-function formatStatus(status, type) {
+function formatStatus(status, type, row) {
--- End diff --

can you move the

```js
if (row.isBlacklisted) return "Blacklisted";
```
into this function as well?  no reason for it to be separated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20399: [SPARK-23209][core] Allow credential manager to w...

2018-01-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20399#discussion_r163989326
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
 ---
@@ -110,7 +111,64 @@ class HadoopDelegationTokenManagerSuite extends 
SparkFunSuite with Matchers {
 creds.getAllTokens.size should be (0)
   }
 
+  test("SPARK-23209: obtain tokens when Hive classes are not available") {
+// This test needs a custom class loader to hide Hive classes which 
are in the classpath.
+// Because the manager code loads the Hive provider directly instead 
of using reflection, we
+// need to drive the test through the custom class loader so a new 
copy that cannot find
+// Hive classes is loaded.
+val currentLoader = Thread.currentThread().getContextClassLoader()
+val noHive = new ClassLoader() {
+  override def loadClass(name: String, resolve: Boolean): Class[_] = {
+if (name.startsWith("org.apache.hive") || 
name.startsWith("org.apache.hadoop.hive")) {
+  throw new ClassNotFoundException(name)
+}
+
+if (name.startsWith("java") || name.startsWith("scala")) {
+  currentLoader.loadClass(name)
+} else {
+  val classFileName = name.replaceAll("\\.", "/") + ".class"
+  val in = currentLoader.getResourceAsStream(classFileName)
+  if (in != null) {
+val bytes = IOUtils.toByteArray(in)
+defineClass(name, bytes, 0, bytes.length)
+  } else {
+throw new ClassNotFoundException(name)
+  }
+}
+  }
+}
+
+try {
+  Thread.currentThread().setContextClassLoader(noHive)
+  val test = 
noHive.loadClass(NoHiveTest.getClass.getName().stripSuffix("$"))
+  test.getMethod("main", classOf[Array[String]]).invoke(null, 
Array[String]())
+} finally {
+  Thread.currentThread().setContextClassLoader(currentLoader)
+}
+  }
+
   private[spark] def hadoopFSsToAccess(hadoopConf: Configuration): 
Set[FileSystem] = {
 Set(FileSystem.get(hadoopConf))
   }
 }
+
+/** Test code for SPARK-23209 to avoid using too much reflection above. */
+private object NoHiveTest extends Matchers {
+
+  def main(args: Array[String]): Unit = {
--- End diff --

super minor: can you name this something other than "main"?  it makes it 
seem like you're launching it as seperate process (maybe leftover from earlier 
attempt?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20399: [SPARK-23209][core] Allow credential manager to w...

2018-01-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20399#discussion_r163980133
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
 ---
@@ -110,7 +111,64 @@ class HadoopDelegationTokenManagerSuite extends 
SparkFunSuite with Matchers {
 creds.getAllTokens.size should be (0)
   }
 
+  test("SPARK-23209: obtain tokens when Hive classes are not available") {
+// This test needs a custom class loader to hide Hive classes which 
are in the classpath.
+// Because the manager code loads the Hive provider directly instead 
of using reflection, we
--- End diff --

just for my understandning, is there any reason the manager should load the 
code directly, rather than using reflection to guard against this?  I guess 
either way is fine, I just had seen us use reflection more to guard against 
this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...

2018-01-24 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20203
  
merged to master.  thanks @attilapiros 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...

2018-01-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20203
  
@attilapiros can you please update the PR description to also mention node 
blacklisting (at least briefly) and file a jira for the followup work, and ping 
me & tom on it?

lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...

2018-01-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20203
  
I tried this out on a cluster and seemed fine.  I also tried with a bad 
app, where all tasks fail, its not just because of the host, and all the 
executors show up as blacklisted, but I guess that is fine.

```scala
sc.parallelize(1 to 1000, 10).map {x => throw new RuntimeException("bad 
application")}.count()
```

https://user-images.githubusercontent.com/71240/35247305-88bceb4c-ff8f-11e7-9e98-285ffd604f46.png;>

or about the same if there is just one bad task:

```scala
sc.parallelize(1 to 1000, 10).map {x =>
   if (TaskContext.get().partitionId() == 0) throw new 
RuntimeException("bad task")
   else 0
}.count()
```

of course if you have more executors, then a whole bunch of them show up as 
blacklisted, but still seems fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19992: [SPARK-22805][CORE] Use StorageLevel aliases in event lo...

2018-01-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19992
  
thanks for looking into this @superbobry -- can you actually close this 
yourself?  we can't directly close it (there is a way but its more complicated)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...

2018-01-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20203
  
@attilapiros test failures look real (you probably just need to regenerate 
some of those expectations).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...

2018-01-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20203
  
btw another way you could test out having a bad host would be something 
like this (untested):

```scala
import org.apache.spark.SparkEnv

val hosts = sc.parallelize(1 to 1, 100).map { _ => 
InetAddress.getHostName()}.collect().toSet
val badHost = hosts.head

sc.parallelize(1 to 1, 10).map { x =>
  if (InetAddress.getHostName() == badHost) throw new RuntimeException("Bad 
host")
else (x % 3, x)
}.reduceByKey((a, b) => a + b).collect()
```

that way you make sure the failures are consistently on one host, not 
dependent on higher executor ids getting concentrated on one host.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r162714257
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala ---
@@ -59,31 +60,55 @@ class TaskSetBlacklistSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
   val shouldBeBlacklisted = (executor == "exec1" && index == 0)
   assert(taskSetBlacklist.isExecutorBlacklistedForTask(executor, 
index) === shouldBeBlacklisted)
 }
+
 assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+verify(listenerBusMock, never())
+  .post(isA(classOf[SparkListenerExecutorBlacklistedForStage]))
+
 assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+verify(listenerBusMock, never())
+  .post(isA(classOf[SparkListenerNodeBlacklistedForStage]))
 
 // Mark task 1 failed on exec1 -- this pushes the executor into the 
blacklist
 taskSetBlacklist.updateBlacklistForFailedTask(
   "hostA", exec = "exec1", index = 1, failureReason = "testing")
+
 assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
-assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 verify(listenerBusMock).post(
   SparkListenerExecutorBlacklistedForStage(0, "exec1", 2, 0, 
attemptId))
+
+assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+verify(listenerBusMock, never())
+  .post(isA(classOf[SparkListenerNodeBlacklistedForStage]))
+
 // Mark one task as failed on exec2 -- not enough for any further 
blacklisting yet.
 taskSetBlacklist.updateBlacklistForFailedTask(
   "hostA", exec = "exec2", index = 0, failureReason = "testing")
 assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
+
 assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2"))
+verify(listenerBusMock, never()).post(
+  SparkListenerNodeBlacklistedForStage(0, "hostA", 2, 0, attemptId))
+
 assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+verify(listenerBusMock, never())
+  .post(isA(classOf[SparkListenerNodeBlacklistedForStage]))
--- End diff --

the `verify` you add just above this is pointless with this one too, right? 
 I think you only need this one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20284: [SPARK-23103][core] Ensure correct sort order for negati...

2018-01-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20284
  
even though we don't *know* of this causing a bug in 2.3, I still think we 
should merge it in there just because there may be some case we aren't thinking 
of, and this is a relatively small, safe fix.

so, I'm merging to master & 2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20138: [SPARK-20664][core] Delete stale application data from S...

2018-01-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20138
  
as RC1 failed and RC2 is going to be cut soon, I'm going to merge this to 
master & 2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20319: [SPARK-22884][ML][TESTS] ML test for StructuredStreaming...

2018-01-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20319
  
Jenkins, add to whitelist


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20284: [SPARK-23103][core] Ensure correct sort order for...

2018-01-17 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20284#discussion_r162166090
  
--- Diff: 
common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java 
---
@@ -112,7 +114,8 @@ public void setup() throws Exception {
   t.key = "key" + i;
   t.id = "id" + i;
   t.name = "name" + RND.nextInt(MAX_ENTRIES);
-  t.num = RND.nextInt(MAX_ENTRIES);
+  // Force one item to have an integer value of zero to test the fix 
for SPARK-23103.
+  t.num = (i != 0) ? (int) RND.nextLong() : 0;
--- End diff --

why the chang from `RND.nextInt(MAX_ENTRIES)`?  this seems fine, just 
seemed like you were more likely to stress collision on this index before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...

2018-01-17 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20203
  
that sounds fine with me


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...

2018-01-17 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20203
  
yeah I think its probably fine to update the executors page that way -- 
lets at least see how it looks.   Would the list include all stages ever 
blacklisted?  Only those stages still running?  The most recent 3 blacklisted 
stages?

@attilapiros how about you take a shot at updating the executors page as 
well, we can see what that looks like?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-17 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r162087946
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala ---
@@ -128,13 +130,17 @@ private[scheduler] class TaskSetBlacklist(val conf: 
SparkConf, val stageId: Int,
 }
 
 // Check if enough tasks have failed on the executor to blacklist it 
for the entire stage.
-if (execFailures.numUniqueTasksWithFailures >= 
MAX_FAILURES_PER_EXEC_STAGE) {
+val numFailures = execFailures.numUniqueTasksWithFailures
+if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
   if (blacklistedExecs.add(exec)) {
 logInfo(s"Blacklisting executor ${exec} for stage $stageId")
 // This executor has been pushed into the blacklist for this 
stage.  Let's check if it
 // pushes the whole node into the blacklist.
 val blacklistedExecutorsOnNode =
   execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
+val now = clock.getTimeMillis()
+listenerBus.post(
+  SparkListenerExecutorBlacklistedForStage(now, exec, numFailures, 
stageId, stageAttemptId))
 if (blacklistedExecutorsOnNode.size >= 
MAX_FAILED_EXEC_PER_NODE_STAGE) {
   if (blacklistedNodes.add(host)) {
 logInfo(s"Blacklisting ${host} for stage $stageId")
--- End diff --

yes that makes sense to me -- totally agree with your point about handling 
late updates.  After all, another executor can get added to the node at any 
time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...

2018-01-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20203
  
attila attached some screenshots to the jira (showing both what happens 
with stage blacklisting and full application blacklisting).

The only change here is to the page for a specific stage, so it seems clear 
that its saying the executor is blacklisted for the same stage.  OTOH, if we 
were to change the executors page as well, then you would need to put something 
to indicate which page ... I'm not sure what you would put, to be both useful 
and succinct.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20138: [SPARK-20664][core] Delete stale application data from S...

2018-01-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20138
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...

2018-01-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20203
  
@ajbozarth maybe you have some thoughts on the UI, and whether it makes 
sense to put anything on the executors page?

@CodingCat you also often have good UI suggestions :)

thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161884194
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala ---
@@ -36,7 +36,9 @@ import org.apache.spark.util.Clock
  * [[TaskSetManager]] this class is designed only to be called from code 
with a lock on the
  * TaskScheduler (e.g. its event handlers). It should not be called from 
other threads.
  */
-private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val 
stageId: Int, val clock: Clock)
+private[scheduler] class TaskSetBlacklist(private val listenerBus: 
LiveListenerBus,
+  val conf: SparkConf, val 
stageId: Int,
--- End diff --

style: if its multiline, each param on its own line, double-indented


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161885726
  
--- Diff: 
core/src/test/resources/HistoryServerExpectations/stage_blacklisting_for_stage_expectation.json
 ---
@@ -0,0 +1,639 @@
+{
--- End diff --

nit: "stage" twice in the filename is confusing, how about just 
"blacklisting_for_stage_expectation.json"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161884916
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala ---
@@ -128,13 +130,17 @@ private[scheduler] class TaskSetBlacklist(val conf: 
SparkConf, val stageId: Int,
 }
 
 // Check if enough tasks have failed on the executor to blacklist it 
for the entire stage.
-if (execFailures.numUniqueTasksWithFailures >= 
MAX_FAILURES_PER_EXEC_STAGE) {
+val numFailures = execFailures.numUniqueTasksWithFailures
+if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
   if (blacklistedExecs.add(exec)) {
 logInfo(s"Blacklisting executor ${exec} for stage $stageId")
 // This executor has been pushed into the blacklist for this 
stage.  Let's check if it
 // pushes the whole node into the blacklist.
 val blacklistedExecutorsOnNode =
   execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
+val now = clock.getTimeMillis()
+listenerBus.post(
+  SparkListenerExecutorBlacklistedForStage(now, exec, numFailures, 
stageId, stageAttemptId))
 if (blacklistedExecutorsOnNode.size >= 
MAX_FAILED_EXEC_PER_NODE_STAGE) {
   if (blacklistedNodes.add(host)) {
 logInfo(s"Blacklisting ${host} for stage $stageId")
--- End diff --

if we're going to do this for executors, we should do it for nodes too.  In 
the UI, you'd just show for each executor that it was blacklisted for the 
stage, I dont think you would need to distinguish whether it was blacklisted 
b/c of the entire node, or just the one executor was blacklisted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161885207
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -211,6 +211,11 @@ private[spark] class AppStatusListener(
 updateBlackListStatus(event.executorId, true)
   }
 
+  override def onExecutorBlacklistedForStage(
+event: SparkListenerExecutorBlacklistedForStage): Unit = {
--- End diff --

double-indent this line (4 spaces)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment

2018-01-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20236
  
@vanzin @jerryshao want to take another look?

Now it
* filters out "apache spark"
* lets you enter an arbitrary id
* if there's an error, just prompts again

sample session: 
https://gist.github.com/squito/de73fbd0b9c00961377068b91283e04c


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-15 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161660926
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -663,6 +665,95 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
 freshUI.get.ui.store.job(0)
   }
 
+  test("clean up stale app information") {
+val storeDir = Utils.createTempDir()
+val conf = createTestConf().set(LOCAL_STORE_DIR, 
storeDir.getAbsolutePath())
+val provider = spy(new FsHistoryProvider(conf))
+val appId = "new1"
+
+// Write logs for two app attempts.
+doReturn(1L).when(provider).getNewLastScanTime()
+val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
+writeFile(attempt1, true, None,
+  SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("1")),
+  SparkListenerJobStart(0, 1L, Nil, null),
+  SparkListenerApplicationEnd(5L)
+  )
+val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
+writeFile(attempt2, true, None,
+  SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("2")),
+  SparkListenerJobStart(0, 1L, Nil, null),
+  SparkListenerApplicationEnd(5L)
+  )
+updateAndCheck(provider) { list =>
+  assert(list.size === 1)
+  assert(list(0).id === appId)
+  assert(list(0).attempts.size === 2)
+}
+
+// Load the app's UI.
+val ui = provider.getAppUI(appId, Some("1"))
+assert(ui.isDefined)
+
+// Delete the underlying log file for attempt 1 and rescan. The UI 
should go away, but since
+// attempt 2 still exists, listing data should be there.
+doReturn(2L).when(provider).getNewLastScanTime()
+attempt1.delete()
+updateAndCheck(provider) { list =>
+  assert(list.size === 1)
+  assert(list(0).id === appId)
+  assert(list(0).attempts.size === 1)
+}
+assert(!ui.get.valid)
+assert(provider.getAppUI(appId, None) === None)
+
+// Delete the second attempt's log file. Now everything should go away.
+doReturn(3L).when(provider).getNewLastScanTime()
+attempt2.delete()
+updateAndCheck(provider) { list =>
+  assert(list.isEmpty)
+}
+  }
+
+  test("SPARK-21571: clean up removes invalid history files") {
+val clock = new ManualClock(TimeUnit.DAYS.toMillis(120))
+val conf = createTestConf().set("spark.history.fs.cleaner.maxAge", 
s"2d")
+val provider = new FsHistoryProvider(conf, clock) {
+  override def getNewLastScanTime(): Long = clock.getTimeMillis()
+}
+
+// Create 0-byte size inprogress and complete files
+val logfile1 = newLogFile("emptyInprogressLogFile", None, inProgress = 
true)
+logfile1.createNewFile()
+logfile1.setLastModified(clock.getTimeMillis())
+
+val logfile2 = newLogFile("emptyFinishedLogFile", None, inProgress = 
false)
+logfile2.createNewFile()
+logfile2.setLastModified(clock.getTimeMillis())
+
+// Create an incomplete log file, has an end record but no start 
record.
+val logfile3 = newLogFile("nonEmptyCorruptLogFile", None, inProgress = 
false)
+writeFile(logfile3, true, None, SparkListenerApplicationEnd(0))
+logfile3.setLastModified(clock.getTimeMillis())
+
+provider.checkForLogs()
+provider.cleanLogs()
+assert(new File(testDir.toURI).listFiles().size === 3)
+
+// Move the clock forward 1 day and scan the files again. They should 
still be there.
+clock.advance(TimeUnit.DAYS.toMillis(1))
+provider.checkForLogs()
+provider.cleanLogs()
+assert(new File(testDir.toURI).listFiles().size === 3)
+
+// Move the clock forward another 2 days and scan the files again. 
This time the cleaner should
+// pick up the invalid files and get rid of them.
+clock.advance(TimeUnit.DAYS.toMillis(2))
+provider.checkForLogs()
+provider.cleanLogs()
+assert(new File(testDir.toURI).listFiles().size === 0)
--- End diff --

I think you should add a case where one file starts out empty, say even for 
one full day, but then becomes valid before the expiration time, and make sure 
it does *not* get cleaned up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20056: [SPARK-22878] [CORE] Count totalDroppedEvents for LiveLi...

2018-01-15 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20056
  
I see that `LiveListenerBus.droppedEventsCounter` and `lastReportTimestamp` 
are unused, so it certainly makes sense to clean them up one way or the other 
-- but that might mean we should delete them, not that we necessarily need to 
do something else with them.

I could see an argument that there are already monitoring systems hooked up 
to the old metric, 
["numEventsDropped"](https://github.com/apache/spark/blob/718bbc939037929ef5b8f4b4fe10aadfbab4408e/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L266),
 so maybe we should bring back the total with that metric.

But do you really want even more logging of the total, beyond the logging 
from each queue?  Seems like it would only be more confusing to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161099778
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -663,6 +665,95 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
 freshUI.get.ui.store.job(0)
   }
 
+  test("clean up stale app information") {
+val storeDir = Utils.createTempDir()
+val conf = createTestConf().set(LOCAL_STORE_DIR, 
storeDir.getAbsolutePath())
+val provider = spy(new FsHistoryProvider(conf))
+val appId = "new1"
+
+// Write logs for two app attempts.
+doReturn(1L).when(provider).getNewLastScanTime()
+val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
+writeFile(attempt1, true, None,
+  SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("1")),
+  SparkListenerJobStart(0, 1L, Nil, null),
+  SparkListenerApplicationEnd(5L)
+  )
+val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
+writeFile(attempt2, true, None,
+  SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("2")),
+  SparkListenerJobStart(0, 1L, Nil, null),
+  SparkListenerApplicationEnd(5L)
+  )
+updateAndCheck(provider) { list =>
+  assert(list.size === 1)
+  assert(list(0).id === appId)
+  assert(list(0).attempts.size === 2)
+}
+
+// Load the app's UI.
+val ui = provider.getAppUI(appId, Some("1"))
+assert(ui.isDefined)
+
+// Delete the underlying log file for attempt 1 and rescan. The UI 
should go away, but since
+// attempt 2 still exists, listing data should be there.
+doReturn(2L).when(provider).getNewLastScanTime()
+attempt1.delete()
+updateAndCheck(provider) { list =>
+  assert(list.size === 1)
+  assert(list(0).id === appId)
+  assert(list(0).attempts.size === 1)
+}
+assert(!ui.get.valid)
+assert(provider.getAppUI(appId, None) === None)
+
+// Delete the second attempt's log file. Now everything should go away.
+doReturn(3L).when(provider).getNewLastScanTime()
+attempt2.delete()
+updateAndCheck(provider) { list =>
+  assert(list.isEmpty)
+}
+  }
+
+  test("SPARK-21571: clean up removes invalid history files") {
+val clock = new ManualClock(TimeUnit.DAYS.toMillis(120))
--- End diff --

just curious, why start at 120 days?  (not that it matters ...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161099310
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -834,6 +906,9 @@ private[history] case class FsHistoryProviderMetadata(
 
 private[history] case class LogInfo(
 @KVIndexParam logPath: String,
+@KVIndexParam("lastProcessed") lastProcessed: Long,
+appId: Option[String],
--- End diff --

also a comment here explaining why appId is an Option, as that is unexpected


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161098356
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -544,73 +621,75 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 bus.addListener(listener)
 replay(fileStatus, bus, eventsFilter = eventsFilter)
 
-listener.applicationInfo.foreach { app =>
-  // Invalidate the existing UI for the reloaded app attempt, if any. 
See LoadedAppUI for a
-  // discussion on the UI lifecycle.
-  synchronized {
-activeUIs.get((app.info.id, 
app.attempts.head.info.attemptId)).foreach { ui =>
-  ui.invalidate()
-  ui.ui.store.close()
+val (appId, attemptId) = listener.applicationInfo match {
+  case Some(app) =>
+// Invalidate the existing UI for the reloaded app attempt, if 
any. See LoadedAppUI for a
+// discussion on the UI lifecycle.
+synchronized {
+  activeUIs.get((app.info.id, 
app.attempts.head.info.attemptId)).foreach { ui =>
+ui.invalidate()
+ui.ui.store.close()
+  }
 }
-  }
 
-  addListing(app)
+addListing(app)
+(Some(app.info.id), app.attempts.head.info.attemptId)
+
+  case _ =>
+(None, None)
--- End diff --

I think comment here explaining that writing an entry with no appId will 
mark this log file as eligible for automatic recovery, if its still in that 
state after max_log_age.  (if I understood correctly)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20138: [SPARK-20664][core] Delete stale application data from S...

2018-01-11 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20138
  
well, perhaps I mis-represented this -- you still need to turn the event 
log cleaning on explicitly with the old option, 
"spark.history.fs.cleaner.enabled".  This just doesn't include the "aggressive" 
option that was originally proposed by @ericvandenbergfb


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161095561
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 try {
   val newLastScanTime = getNewLastScanTime()
   logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-  // scan for modified applications, replay and merge them
-  val logInfos = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
+
+  val updated = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
 .filter { entry =>
   !entry.isDirectory() &&
 // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
 // reading a garbage file is safe, but we would log an error 
which can be scary to
 // the end-user.
 !entry.getPath().getName().startsWith(".") &&
-SparkHadoopUtil.get.checkAccessPermission(entry, 
FsAction.READ) &&
-recordedFileSize(entry.getPath()) < entry.getLen()
+SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+}
+.filter { entry =>
+  try {
+val info = listing.read(classOf[LogInfo], 
entry.getPath().toString())
+if (info.fileSize < entry.getLen()) {
+  // Log size has changed, it should be parsed.
+  true
+} else {
+  // If the SHS view has a valid application, update the time 
the file was last seen so
+  // that the entry is not deleted from the SHS listing.
+  if (info.appId.isDefined) {
+listing.write(info.copy(lastProcessed = newLastScanTime))
+  }
+  false
+}
+  } catch {
+case _: NoSuchElementException =>
+  // If the file is currently not being tracked by the SHS, 
add an entry for it and try
+  // to parse it. This will allow the cleaner code to detect 
the file as stale later on
+  // if it was not possible to parse it.
+  listing.write(LogInfo(entry.getPath().toString(), 
newLastScanTime, None, None,
+entry.getLen()))
+  entry.getLen() > 0
+  }
 }
 .sortWith { case (entry1, entry2) =>
   entry1.getModificationTime() > entry2.getModificationTime()
 }
 
-  if (logInfos.nonEmpty) {
-logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
+  if (updated.nonEmpty) {
+logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.getPath)}")
   }
 
-  var tasks = mutable.ListBuffer[Future[_]]()
-
-  try {
-for (file <- logInfos) {
-  tasks += replayExecutor.submit(new Runnable {
-override def run(): Unit = mergeApplicationListing(file)
+  val tasks = updated.map { entry =>
+try {
+  replayExecutor.submit(new Runnable {
+override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime)
   })
+} catch {
+  // let the iteration over logInfos break, since an exception on
--- End diff --

and actually you've moved the try/catch so this is no longer true, you'll 
continue to submit all tasks if one throws an exception.  (I guess I'm not 
really sure why the old code did it that way ...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161082423
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 try {
   val newLastScanTime = getNewLastScanTime()
   logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-  // scan for modified applications, replay and merge them
-  val logInfos = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
+
+  val updated = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
 .filter { entry =>
   !entry.isDirectory() &&
 // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
 // reading a garbage file is safe, but we would log an error 
which can be scary to
 // the end-user.
 !entry.getPath().getName().startsWith(".") &&
-SparkHadoopUtil.get.checkAccessPermission(entry, 
FsAction.READ) &&
-recordedFileSize(entry.getPath()) < entry.getLen()
+SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+}
+.filter { entry =>
+  try {
+val info = listing.read(classOf[LogInfo], 
entry.getPath().toString())
+if (info.fileSize < entry.getLen()) {
+  // Log size has changed, it should be parsed.
+  true
+} else {
+  // If the SHS view has a valid application, update the time 
the file was last seen so
+  // that the entry is not deleted from the SHS listing.
+  if (info.appId.isDefined) {
+listing.write(info.copy(lastProcessed = newLastScanTime))
+  }
+  false
+}
+  } catch {
+case _: NoSuchElementException =>
+  // If the file is currently not being tracked by the SHS, 
add an entry for it and try
+  // to parse it. This will allow the cleaner code to detect 
the file as stale later on
+  // if it was not possible to parse it.
+  listing.write(LogInfo(entry.getPath().toString(), 
newLastScanTime, None, None,
+entry.getLen()))
+  entry.getLen() > 0
+  }
 }
 .sortWith { case (entry1, entry2) =>
   entry1.getModificationTime() > entry2.getModificationTime()
 }
 
-  if (logInfos.nonEmpty) {
-logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
+  if (updated.nonEmpty) {
+logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.getPath)}")
   }
 
-  var tasks = mutable.ListBuffer[Future[_]]()
-
-  try {
-for (file <- logInfos) {
-  tasks += replayExecutor.submit(new Runnable {
-override def run(): Unit = mergeApplicationListing(file)
+  val tasks = updated.map { entry =>
+try {
+  replayExecutor.submit(new Runnable {
+override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime)
   })
+} catch {
+  // let the iteration over logInfos break, since an exception on
--- End diff --

you've renamed `logInfos` to `updated`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment

2018-01-11 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20236
  
@vanzin what exactly are you looking for?  The one thing which would be 
easy is letting you write in an arbitrary jira id (no name searching or 
anything), that sound OK?

I guess this bug isn't really a major issue so no urgency in getting this 
in, so I can add to this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment

2018-01-11 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20236
  
@jerryshao this should fix it, but I don't have anything to merge to test 
this out -- would appreciate if someone could try it before we merge.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20236: [SPARK-23044] Error handling for jira assignment

2018-01-11 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/20236

[SPARK-23044] Error handling for jira assignment

## What changes were proposed in this pull request?

In case the selected user isn't a contributor yet, or any other unexpected 
error, just don't assign the jira.

## How was this patch tested?

Couldn't really test the error case, just some testing of similar-ish code 
in python shell.  Haven't run a merge yet.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark SPARK-23044

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20236.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20236


commit 8c4cc61c2a06b310480fafb3b28067a6f961816a
Author: Imran Rashid <irashid@...>
Date:   2018-01-11T15:42:16Z

[SPARK-23044] Error handling for jira assignment




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20056: [SPARK-22878] [CORE] Count totalDroppedEvents for LiveLi...

2018-01-10 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20056
  
I have the same question as @jiangxb1987 , what is the situation where 
you'd use this metric?  the jira doesn't say either.  seems like existing 
metrics mostly cover this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20013: [SPARK-20657][core] Speed up rendering of the stages pag...

2018-01-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20013
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20013: [SPARK-20657][core] Speed up rendering of the sta...

2018-01-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20013#discussion_r160017859
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -119,118 +121,115 @@ private class LiveTask(
 
   import LiveEntityHelpers._
 
-  private var recordedMetrics: v1.TaskMetrics = null
+  private var metrics: MetricsTracker = new MetricsTracker()
 
   var errorMessage: Option[String] = None
 
   /**
* Update the metrics for the task and return the difference between the 
previous and new
* values.
*/
-  def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = {
+  def updateMetrics(metrics: TaskMetrics): MetricsTracker = {
 if (metrics != null) {
-  val old = recordedMetrics
-  recordedMetrics = new v1.TaskMetrics(
-metrics.executorDeserializeTime,
-metrics.executorDeserializeCpuTime,
-metrics.executorRunTime,
-metrics.executorCpuTime,
-metrics.resultSize,
-metrics.jvmGCTime,
-metrics.resultSerializationTime,
-metrics.memoryBytesSpilled,
-metrics.diskBytesSpilled,
-metrics.peakExecutionMemory,
-new v1.InputMetrics(
-  metrics.inputMetrics.bytesRead,
-  metrics.inputMetrics.recordsRead),
-new v1.OutputMetrics(
-  metrics.outputMetrics.bytesWritten,
-  metrics.outputMetrics.recordsWritten),
-new v1.ShuffleReadMetrics(
-  metrics.shuffleReadMetrics.remoteBlocksFetched,
-  metrics.shuffleReadMetrics.localBlocksFetched,
-  metrics.shuffleReadMetrics.fetchWaitTime,
-  metrics.shuffleReadMetrics.remoteBytesRead,
-  metrics.shuffleReadMetrics.remoteBytesReadToDisk,
-  metrics.shuffleReadMetrics.localBytesRead,
-  metrics.shuffleReadMetrics.recordsRead),
-new v1.ShuffleWriteMetrics(
-  metrics.shuffleWriteMetrics.bytesWritten,
-  metrics.shuffleWriteMetrics.writeTime,
-  metrics.shuffleWriteMetrics.recordsWritten))
-  if (old != null) calculateMetricsDelta(recordedMetrics, old) else 
recordedMetrics
+  val old = this.metrics
+  val newMetrics = new MetricsTracker()
+  newMetrics.executorDeserializeTime = metrics.executorDeserializeTime
+  newMetrics.executorDeserializeCpuTime = 
metrics.executorDeserializeCpuTime
+  newMetrics.executorRunTime = metrics.executorRunTime
+  newMetrics.executorCpuTime = metrics.executorCpuTime
+  newMetrics.resultSize = metrics.resultSize
+  newMetrics.jvmGcTime = metrics.jvmGCTime
+  newMetrics.resultSerializationTime = metrics.resultSerializationTime
+  newMetrics.memoryBytesSpilled = metrics.memoryBytesSpilled
+  newMetrics.diskBytesSpilled = metrics.diskBytesSpilled
+  newMetrics.peakExecutionMemory = metrics.peakExecutionMemory
+  newMetrics.inputBytesRead = metrics.inputMetrics.bytesRead
+  newMetrics.inputRecordsRead = metrics.inputMetrics.recordsRead
+  newMetrics.outputBytesWritten = metrics.outputMetrics.bytesWritten
+  newMetrics.outputRecordsWritten = 
metrics.outputMetrics.recordsWritten
+  newMetrics.shuffleRemoteBlocksFetched = 
metrics.shuffleReadMetrics.remoteBlocksFetched
+  newMetrics.shuffleLocalBlocksFetched = 
metrics.shuffleReadMetrics.localBlocksFetched
+  newMetrics.shuffleFetchWaitTime = 
metrics.shuffleReadMetrics.fetchWaitTime
+  newMetrics.shuffleRemoteBytesRead = 
metrics.shuffleReadMetrics.remoteBytesRead
+  newMetrics.shuffleRemoteBytesReadToDisk = 
metrics.shuffleReadMetrics.remoteBytesReadToDisk
+  newMetrics.shuffleLocalBytesRead = 
metrics.shuffleReadMetrics.localBytesRead
+  newMetrics.shuffleRecordsRead = 
metrics.shuffleReadMetrics.recordsRead
+  newMetrics.shuffleBytesWritten = 
metrics.shuffleWriteMetrics.bytesWritten
+  newMetrics.shuffleWriteTime = metrics.shuffleWriteMetrics.writeTime
+  newMetrics.shuffleRecordsWritten = 
metrics.shuffleWriteMetrics.recordsWritten
+
+  this.metrics = newMetrics
+  if (old.executorDeserializeTime >= 0L) {
+old.subtract(newMetrics)
+old
+  } else {
+newMetrics
+  }
 } else {
   null
 }
   }
 
-  /**
-   * Return a new TaskMetrics object containing the delta of the various 
fields of the given
-   * metrics objects. This is currently targeted at updating stage data, 
so it does not
-   * necessarily calculate deltas for all the fields.
-   */
-  private def calculateMetricsDelta(
-  metrics: v1.TaskMetrics,
-  old: v1.TaskMetrics): v1.TaskMetrics = {
-

[GitHub] spark pull request #20013: [SPARK-20657][core] Speed up rendering of the sta...

2018-01-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20013#discussion_r159988995
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -110,107 +114,240 @@ private[spark] class AppStatusStore(
 if (details) stageWithDetails(stage) else stage
   }
 
+  def taskCount(stageId: Int, stageAttemptId: Int): Long = {
+store.count(classOf[TaskDataWrapper], "stage", Array(stageId, 
stageAttemptId))
+  }
+
+  def localitySummary(stageId: Int, stageAttemptId: Int): Map[String, 
Long] = {
+store.read(classOf[StageDataWrapper], Array(stageId, 
stageAttemptId)).locality
+  }
+
+  /**
+   * Calculates a summary of the task metrics for the given stage attempt, 
returning the
+   * requested quantiles for the recorded metrics.
+   *
+   * This method can be expensive if the requested quantiles are not 
cached; the method
+   * will only cache certain quantiles (every 0.05 step), so it's 
recommended to stick to
+   * those to avoid expensive scans of all task data.
+   */
   def taskSummary(
   stageId: Int,
   stageAttemptId: Int,
-  quantiles: Array[Double]): v1.TaskMetricDistributions = {
-
-val stage = Array(stageId, stageAttemptId)
-
-val rawMetrics = store.view(classOf[TaskDataWrapper])
-  .index("stage")
-  .first(stage)
-  .last(stage)
-  .asScala
-  .flatMap(_.info.taskMetrics)
-  .toList
-  .view
-
-def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] =
-  Distribution(rawMetrics.map { d => f(d) 
}).get.getQuantiles(quantiles)
-
-// We need to do a lot of similar munging to nested metrics here.  For 
each one,
-// we want (a) extract the values for nested metrics (b) make a 
distribution for each metric
-// (c) shove the distribution into the right field in our return type 
and (d) only return
-// a result if the option is defined for any of the tasks.  
MetricHelper is a little util
-// to make it a little easier to deal w/ all of the nested options.  
Mostly it lets us just
-// implement one "build" method, which just builds the quantiles for 
each field.
-
-val inputMetrics =
-  new MetricHelper[v1.InputMetrics, 
v1.InputMetricDistributions](rawMetrics, quantiles) {
-def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = 
raw.inputMetrics
-
-def build: v1.InputMetricDistributions = new 
v1.InputMetricDistributions(
-  bytesRead = submetricQuantiles(_.bytesRead),
-  recordsRead = submetricQuantiles(_.recordsRead)
-)
-  }.build
-
-val outputMetrics =
-  new MetricHelper[v1.OutputMetrics, 
v1.OutputMetricDistributions](rawMetrics, quantiles) {
-def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = 
raw.outputMetrics
-
-def build: v1.OutputMetricDistributions = new 
v1.OutputMetricDistributions(
-  bytesWritten = submetricQuantiles(_.bytesWritten),
-  recordsWritten = submetricQuantiles(_.recordsWritten)
-)
-  }.build
-
-val shuffleReadMetrics =
-  new MetricHelper[v1.ShuffleReadMetrics, 
v1.ShuffleReadMetricDistributions](rawMetrics,
-quantiles) {
-def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics =
-  raw.shuffleReadMetrics
-
-def build: v1.ShuffleReadMetricDistributions = new 
v1.ShuffleReadMetricDistributions(
-  readBytes = submetricQuantiles { s => s.localBytesRead + 
s.remoteBytesRead },
-  readRecords = submetricQuantiles(_.recordsRead),
-  remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
-  remoteBytesReadToDisk = 
submetricQuantiles(_.remoteBytesReadToDisk),
-  remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
-  localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
-  totalBlocksFetched = submetricQuantiles { s =>
-s.localBlocksFetched + s.remoteBlocksFetched
-  },
-  fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
-)
-  }.build
-
-val shuffleWriteMetrics =
-  new MetricHelper[v1.ShuffleWriteMetrics, 
v1.ShuffleWriteMetricDistributions](rawMetrics,
-quantiles) {
-def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics =
-  raw.shuffleWriteMetrics
-
-def build: v1.ShuffleWriteMetricDistributions = new 
v1.ShuffleWriteMetricDistributions(
-  writeBytes = submetricQuantiles(_.bytesWritten),
-  writeRecords = submetricQuantiles(_.recordsWritten),
   

[GitHub] spark pull request #20013: [SPARK-20657][core] Speed up rendering of the sta...

2018-01-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20013#discussion_r159992789
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -110,107 +114,240 @@ private[spark] class AppStatusStore(
 if (details) stageWithDetails(stage) else stage
   }
 
+  def taskCount(stageId: Int, stageAttemptId: Int): Long = {
+store.count(classOf[TaskDataWrapper], "stage", Array(stageId, 
stageAttemptId))
+  }
+
+  def localitySummary(stageId: Int, stageAttemptId: Int): Map[String, 
Long] = {
+store.read(classOf[StageDataWrapper], Array(stageId, 
stageAttemptId)).locality
+  }
+
+  /**
+   * Calculates a summary of the task metrics for the given stage attempt, 
returning the
+   * requested quantiles for the recorded metrics.
+   *
+   * This method can be expensive if the requested quantiles are not 
cached; the method
+   * will only cache certain quantiles (every 0.05 step), so it's 
recommended to stick to
+   * those to avoid expensive scans of all task data.
+   */
   def taskSummary(
   stageId: Int,
   stageAttemptId: Int,
-  quantiles: Array[Double]): v1.TaskMetricDistributions = {
-
-val stage = Array(stageId, stageAttemptId)
-
-val rawMetrics = store.view(classOf[TaskDataWrapper])
-  .index("stage")
-  .first(stage)
-  .last(stage)
-  .asScala
-  .flatMap(_.info.taskMetrics)
-  .toList
-  .view
-
-def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] =
-  Distribution(rawMetrics.map { d => f(d) 
}).get.getQuantiles(quantiles)
-
-// We need to do a lot of similar munging to nested metrics here.  For 
each one,
-// we want (a) extract the values for nested metrics (b) make a 
distribution for each metric
-// (c) shove the distribution into the right field in our return type 
and (d) only return
-// a result if the option is defined for any of the tasks.  
MetricHelper is a little util
-// to make it a little easier to deal w/ all of the nested options.  
Mostly it lets us just
-// implement one "build" method, which just builds the quantiles for 
each field.
-
-val inputMetrics =
-  new MetricHelper[v1.InputMetrics, 
v1.InputMetricDistributions](rawMetrics, quantiles) {
-def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = 
raw.inputMetrics
-
-def build: v1.InputMetricDistributions = new 
v1.InputMetricDistributions(
-  bytesRead = submetricQuantiles(_.bytesRead),
-  recordsRead = submetricQuantiles(_.recordsRead)
-)
-  }.build
-
-val outputMetrics =
-  new MetricHelper[v1.OutputMetrics, 
v1.OutputMetricDistributions](rawMetrics, quantiles) {
-def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = 
raw.outputMetrics
-
-def build: v1.OutputMetricDistributions = new 
v1.OutputMetricDistributions(
-  bytesWritten = submetricQuantiles(_.bytesWritten),
-  recordsWritten = submetricQuantiles(_.recordsWritten)
-)
-  }.build
-
-val shuffleReadMetrics =
-  new MetricHelper[v1.ShuffleReadMetrics, 
v1.ShuffleReadMetricDistributions](rawMetrics,
-quantiles) {
-def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics =
-  raw.shuffleReadMetrics
-
-def build: v1.ShuffleReadMetricDistributions = new 
v1.ShuffleReadMetricDistributions(
-  readBytes = submetricQuantiles { s => s.localBytesRead + 
s.remoteBytesRead },
-  readRecords = submetricQuantiles(_.recordsRead),
-  remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
-  remoteBytesReadToDisk = 
submetricQuantiles(_.remoteBytesReadToDisk),
-  remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
-  localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
-  totalBlocksFetched = submetricQuantiles { s =>
-s.localBlocksFetched + s.remoteBlocksFetched
-  },
-  fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
-)
-  }.build
-
-val shuffleWriteMetrics =
-  new MetricHelper[v1.ShuffleWriteMetrics, 
v1.ShuffleWriteMetricDistributions](rawMetrics,
-quantiles) {
-def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics =
-  raw.shuffleWriteMetrics
-
-def build: v1.ShuffleWriteMetricDistributions = new 
v1.ShuffleWriteMetricDistributions(
-  writeBytes = submetricQuantiles(_.bytesWritten),
-  writeRecords = submetricQuantiles(_.recordsWritten),
   

[GitHub] spark pull request #20013: [SPARK-20657][core] Speed up rendering of the sta...

2018-01-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20013#discussion_r159990658
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -110,107 +114,240 @@ private[spark] class AppStatusStore(
 if (details) stageWithDetails(stage) else stage
   }
 
+  def taskCount(stageId: Int, stageAttemptId: Int): Long = {
+store.count(classOf[TaskDataWrapper], "stage", Array(stageId, 
stageAttemptId))
+  }
+
+  def localitySummary(stageId: Int, stageAttemptId: Int): Map[String, 
Long] = {
+store.read(classOf[StageDataWrapper], Array(stageId, 
stageAttemptId)).locality
+  }
+
+  /**
+   * Calculates a summary of the task metrics for the given stage attempt, 
returning the
+   * requested quantiles for the recorded metrics.
+   *
+   * This method can be expensive if the requested quantiles are not 
cached; the method
+   * will only cache certain quantiles (every 0.05 step), so it's 
recommended to stick to
+   * those to avoid expensive scans of all task data.
+   */
   def taskSummary(
   stageId: Int,
   stageAttemptId: Int,
-  quantiles: Array[Double]): v1.TaskMetricDistributions = {
-
-val stage = Array(stageId, stageAttemptId)
-
-val rawMetrics = store.view(classOf[TaskDataWrapper])
-  .index("stage")
-  .first(stage)
-  .last(stage)
-  .asScala
-  .flatMap(_.info.taskMetrics)
-  .toList
-  .view
-
-def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] =
-  Distribution(rawMetrics.map { d => f(d) 
}).get.getQuantiles(quantiles)
-
-// We need to do a lot of similar munging to nested metrics here.  For 
each one,
-// we want (a) extract the values for nested metrics (b) make a 
distribution for each metric
-// (c) shove the distribution into the right field in our return type 
and (d) only return
-// a result if the option is defined for any of the tasks.  
MetricHelper is a little util
-// to make it a little easier to deal w/ all of the nested options.  
Mostly it lets us just
-// implement one "build" method, which just builds the quantiles for 
each field.
-
-val inputMetrics =
-  new MetricHelper[v1.InputMetrics, 
v1.InputMetricDistributions](rawMetrics, quantiles) {
-def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = 
raw.inputMetrics
-
-def build: v1.InputMetricDistributions = new 
v1.InputMetricDistributions(
-  bytesRead = submetricQuantiles(_.bytesRead),
-  recordsRead = submetricQuantiles(_.recordsRead)
-)
-  }.build
-
-val outputMetrics =
-  new MetricHelper[v1.OutputMetrics, 
v1.OutputMetricDistributions](rawMetrics, quantiles) {
-def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = 
raw.outputMetrics
-
-def build: v1.OutputMetricDistributions = new 
v1.OutputMetricDistributions(
-  bytesWritten = submetricQuantiles(_.bytesWritten),
-  recordsWritten = submetricQuantiles(_.recordsWritten)
-)
-  }.build
-
-val shuffleReadMetrics =
-  new MetricHelper[v1.ShuffleReadMetrics, 
v1.ShuffleReadMetricDistributions](rawMetrics,
-quantiles) {
-def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics =
-  raw.shuffleReadMetrics
-
-def build: v1.ShuffleReadMetricDistributions = new 
v1.ShuffleReadMetricDistributions(
-  readBytes = submetricQuantiles { s => s.localBytesRead + 
s.remoteBytesRead },
-  readRecords = submetricQuantiles(_.recordsRead),
-  remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
-  remoteBytesReadToDisk = 
submetricQuantiles(_.remoteBytesReadToDisk),
-  remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
-  localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
-  totalBlocksFetched = submetricQuantiles { s =>
-s.localBlocksFetched + s.remoteBlocksFetched
-  },
-  fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
-)
-  }.build
-
-val shuffleWriteMetrics =
-  new MetricHelper[v1.ShuffleWriteMetrics, 
v1.ShuffleWriteMetricDistributions](rawMetrics,
-quantiles) {
-def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics =
-  raw.shuffleWriteMetrics
-
-def build: v1.ShuffleWriteMetricDistributions = new 
v1.ShuffleWriteMetricDistributions(
-  writeBytes = submetricQuantiles(_.bytesWritten),
-  writeRecords = submetricQuantiles(_.recordsWritten),
   

[GitHub] spark issue #20082: [SPARK-22897][CORE]: Expose stageAttemptId in TaskContex...

2018-01-04 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20082
  
sorry this is a little late, but lgtm too.  agree with the points above 
about leaving the old name deprecated and moving to the new name


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19992: [SPARK-22805][CORE] Use StorageLevel aliases in event lo...

2018-01-04 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19992
  
change is fine, but from discussion on the jira I'm unclear if this is 
really worth it -- gain seems pretty small after the other fix in 2.3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19992: [SPARK-22805][CORE] Use StorageLevel aliases in e...

2018-01-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19992#discussion_r159812291
  
--- Diff: core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
---
@@ -2022,12 +1947,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
   |  "Port": 300
   |},
   |"Block ID": "rdd_0_0",
-  |"Storage Level": {
--- End diff --

yup, I completely agree that off heap is not respected in the json format.  
can you file a bug?  I think its still relevant even after this goes in, for 
custom levels


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...

2018-01-04 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19848
  
@steveloughran can you bring this up on dev@?  we should move this 
discussion off of this PR.

(sorry haven't had a chance to look yet, but I appreciate you doing this)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20039: [SPARK-22850][core] Ensure queued events are delivered t...

2018-01-04 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20039
  
merged to master / 2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20117: [SPARK-22921][PROJECT-INFRA] Bug fix in jira assigning

2017-12-29 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20117
  
merged to master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    4   5   6   7   8   9   10   11   12   13   >