[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20244#discussion_r165764166 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * In this test, we simply simulate the scene in concurrent jobs using the same + * rdd which is marked to do checkpoint: + * Job one has already finished the spark job, and start the process of doCheckpoint; + * Job two is submitted, and submitMissingTasks is called. + * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done, + * while part calculates from stage.rdd.partitions is called after doCheckpoint is done, + * we may get a ClassCastException when execute the task because of some rdd will do + * Partition cast. + * + * With this test case, just want to indicate that we should do taskSerialization and + * part calculate in submitMissingTasks with the same rdd checkpoint status. + */ + test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") { +// set checkpointDir. +val tempDir = Utils.createTempDir() +val checkpointDir = File.createTempFile("temp", "", tempDir) +checkpointDir.delete() +sc.setCheckpointDir(checkpointDir.toString) + +// Semaphores to control the process sequence for the two threads below. +val semaphore1 = new Semaphore(0) +val semaphore2 = new Semaphore(0) + +val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4)) +rdd.checkpoint() + +val checkpointRunnable = new Runnable { + override def run() = { +// Simply simulate what RDD.doCheckpoint() do here. +rdd.doCheckpointCalled = true +val checkpointData = rdd.checkpointData.get +RDDCheckpointData.synchronized { + if (checkpointData.cpState == CheckpointState.Initialized) { +checkpointData.cpState = CheckpointState.CheckpointingInProgress + } +} + +val newRDD = checkpointData.doCheckpoint() + +// Release semaphore1 after job triggered in checkpoint finished, so that taskBinary +// serialization can start. +semaphore1.release() +// Wait until taskBinary serialization finished in submitMissingTasksThread. +semaphore2.acquire() + +// Update our state and truncate the RDD lineage. +RDDCheckpointData.synchronized { + checkpointData.cpRDD = Some(newRDD) + checkpointData.cpState = CheckpointState.Checkpointed + rdd.markCheckpointed() +} +semaphore1.release() + } +} + +val submitMissingTasksRunnable = new Runnable { + override def run() = { +// Simply simulate the process of submitMissingTasks. +// Wait until doCheckpoint job running finished, but checkpoint status not changed. +semaphore1.acquire() + +val ser = SparkEnv.get.closureSerializer.newInstance() + +// Simply simulate task serialization while submitMissingTasks. +// Task serialized with rdd checkpoint not finished. +val cleanedFunc = sc.clean(Utils.getIteratorSize _) +val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it) +val taskBinaryBytes = JavaUtils.bufferToArray( + ser.serialize((rdd, func): AnyRef)) +// Because partition calculate is in a synchronized block, so in the fixed code +// partition is calculated here. +val correctPart = rdd.partitions(0) + +// Release semaphore2 so changing checkpoint status to Checkpointed will be done in +// checkpointThread. +semaphore2.release() +// Wait until checkpoint status changed to Checkpointed in checkpointThread. +semaphore1.acquire() + +// Part calculated with rdd checkpoint already finished. +val errPart = rdd.partitions(0) + +// TaskBinary will be deserialized when run task in executor. +val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Unit)]( + ByteBuffer.wrap(taskBinaryBytes), Thread.currentThread.getContextClassLoader) + +val taskContext = mock(classOf[TaskContext]) +doNothing().when(taskContext).killTaskIfInterrupted() + +// ClassCastException is expected with errPart. --- End diff -- I think this is a bit easier to follow if you say Make sure our test
[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20244#discussion_r165763669 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * In this test, we simply simulate the scene in concurrent jobs using the same + * rdd which is marked to do checkpoint: + * Job one has already finished the spark job, and start the process of doCheckpoint; + * Job two is submitted, and submitMissingTasks is called. + * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done, + * while part calculates from stage.rdd.partitions is called after doCheckpoint is done, + * we may get a ClassCastException when execute the task because of some rdd will do + * Partition cast. + * + * With this test case, just want to indicate that we should do taskSerialization and + * part calculate in submitMissingTasks with the same rdd checkpoint status. + */ + test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") { +// set checkpointDir. +val tempDir = Utils.createTempDir() +val checkpointDir = File.createTempFile("temp", "", tempDir) +checkpointDir.delete() +sc.setCheckpointDir(checkpointDir.toString) + +// Semaphores to control the process sequence for the two threads below. +val semaphore1 = new Semaphore(0) +val semaphore2 = new Semaphore(0) + +val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4)) +rdd.checkpoint() + +val checkpointRunnable = new Runnable { + override def run() = { +// Simply simulate what RDD.doCheckpoint() do here. +rdd.doCheckpointCalled = true +val checkpointData = rdd.checkpointData.get +RDDCheckpointData.synchronized { + if (checkpointData.cpState == CheckpointState.Initialized) { +checkpointData.cpState = CheckpointState.CheckpointingInProgress + } +} + +val newRDD = checkpointData.doCheckpoint() + +// Release semaphore1 after job triggered in checkpoint finished, so that taskBinary +// serialization can start. +semaphore1.release() +// Wait until taskBinary serialization finished in submitMissingTasksThread. +semaphore2.acquire() + +// Update our state and truncate the RDD lineage. +RDDCheckpointData.synchronized { + checkpointData.cpRDD = Some(newRDD) + checkpointData.cpState = CheckpointState.Checkpointed + rdd.markCheckpointed() +} +semaphore1.release() + } +} + +val submitMissingTasksRunnable = new Runnable { + override def run() = { +// Simply simulate the process of submitMissingTasks. +// Wait until doCheckpoint job running finished, but checkpoint status not changed. +semaphore1.acquire() + +val ser = SparkEnv.get.closureSerializer.newInstance() + +// Simply simulate task serialization while submitMissingTasks. +// Task serialized with rdd checkpoint not finished. +val cleanedFunc = sc.clean(Utils.getIteratorSize _) +val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it) +val taskBinaryBytes = JavaUtils.bufferToArray( + ser.serialize((rdd, func): AnyRef)) +// Because partition calculate is in a synchronized block, so in the fixed code +// partition is calculated here. +val correctPart = rdd.partitions(0) + +// Release semaphore2 so changing checkpoint status to Checkpointed will be done in +// checkpointThread. +semaphore2.release() +// Wait until checkpoint status changed to Checkpointed in checkpointThread. +semaphore1.acquire() + +// Part calculated with rdd checkpoint already finished. --- End diff -- I'd add a comment above this: Now we're done simulating the interleaving that might happen within the scheduler -- we'll check to make sure the final state is OK by simulating a couple steps that normally happen on the executor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20244#discussion_r165761754 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * In this test, we simply simulate the scene in concurrent jobs using the same + * rdd which is marked to do checkpoint: + * Job one has already finished the spark job, and start the process of doCheckpoint; + * Job two is submitted, and submitMissingTasks is called. + * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done, + * while part calculates from stage.rdd.partitions is called after doCheckpoint is done, + * we may get a ClassCastException when execute the task because of some rdd will do + * Partition cast. + * + * With this test case, just want to indicate that we should do taskSerialization and + * part calculate in submitMissingTasks with the same rdd checkpoint status. + */ + test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") { +// set checkpointDir. +val tempDir = Utils.createTempDir() +val checkpointDir = File.createTempFile("temp", "", tempDir) +checkpointDir.delete() +sc.setCheckpointDir(checkpointDir.toString) + +// Semaphores to control the process sequence for the two threads below. +val semaphore1 = new Semaphore(0) +val semaphore2 = new Semaphore(0) + +val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4)) +rdd.checkpoint() + +val checkpointRunnable = new Runnable { + override def run() = { +// Simply simulate what RDD.doCheckpoint() do here. --- End diff -- I'd remove "simply" here and elsewhere in comments. Also "do" -> "does" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20244#discussion_r165761207 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * In this test, we simply simulate the scene in concurrent jobs using the same + * rdd which is marked to do checkpoint: + * Job one has already finished the spark job, and start the process of doCheckpoint; + * Job two is submitted, and submitMissingTasks is called. + * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done, + * while part calculates from stage.rdd.partitions is called after doCheckpoint is done, + * we may get a ClassCastException when execute the task because of some rdd will do + * Partition cast. + * + * With this test case, just want to indicate that we should do taskSerialization and + * part calculate in submitMissingTasks with the same rdd checkpoint status. + */ + test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") { +// set checkpointDir. +val tempDir = Utils.createTempDir() +val checkpointDir = File.createTempFile("temp", "", tempDir) +checkpointDir.delete() --- End diff -- why do you make a tempfile for the checkpoint dir and then delete it? why not just `checkpointDir = new File(tempDir, "checkpointing")`? Or even just `checkpointDir = Utils.createTempDir()`? (CheckpointSuite does this so it can call `sc.setCheckpointDir`, but you're not doing that here) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20244#discussion_r165763018 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * In this test, we simply simulate the scene in concurrent jobs using the same + * rdd which is marked to do checkpoint: + * Job one has already finished the spark job, and start the process of doCheckpoint; + * Job two is submitted, and submitMissingTasks is called. + * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done, + * while part calculates from stage.rdd.partitions is called after doCheckpoint is done, + * we may get a ClassCastException when execute the task because of some rdd will do + * Partition cast. + * + * With this test case, just want to indicate that we should do taskSerialization and + * part calculate in submitMissingTasks with the same rdd checkpoint status. + */ + test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") { +// set checkpointDir. +val tempDir = Utils.createTempDir() +val checkpointDir = File.createTempFile("temp", "", tempDir) +checkpointDir.delete() +sc.setCheckpointDir(checkpointDir.toString) + +// Semaphores to control the process sequence for the two threads below. +val semaphore1 = new Semaphore(0) +val semaphore2 = new Semaphore(0) + +val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4)) +rdd.checkpoint() + +val checkpointRunnable = new Runnable { + override def run() = { +// Simply simulate what RDD.doCheckpoint() do here. +rdd.doCheckpointCalled = true +val checkpointData = rdd.checkpointData.get +RDDCheckpointData.synchronized { + if (checkpointData.cpState == CheckpointState.Initialized) { +checkpointData.cpState = CheckpointState.CheckpointingInProgress + } +} + +val newRDD = checkpointData.doCheckpoint() + +// Release semaphore1 after job triggered in checkpoint finished, so that taskBinary +// serialization can start. +semaphore1.release() +// Wait until taskBinary serialization finished in submitMissingTasksThread. +semaphore2.acquire() --- End diff -- this would be a bit easier to follow if you rename your semaphores a bit. `semaphore1` -> `doCheckpointStarted` `semaphore2` -> `taskBinaryBytesFinished` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20244#discussion_r165764342 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * In this test, we simply simulate the scene in concurrent jobs using the same + * rdd which is marked to do checkpoint: + * Job one has already finished the spark job, and start the process of doCheckpoint; + * Job two is submitted, and submitMissingTasks is called. + * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done, + * while part calculates from stage.rdd.partitions is called after doCheckpoint is done, + * we may get a ClassCastException when execute the task because of some rdd will do + * Partition cast. + * + * With this test case, just want to indicate that we should do taskSerialization and + * part calculate in submitMissingTasks with the same rdd checkpoint status. + */ + test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") { +// set checkpointDir. +val tempDir = Utils.createTempDir() +val checkpointDir = File.createTempFile("temp", "", tempDir) +checkpointDir.delete() +sc.setCheckpointDir(checkpointDir.toString) + +// Semaphores to control the process sequence for the two threads below. +val semaphore1 = new Semaphore(0) +val semaphore2 = new Semaphore(0) + +val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4)) +rdd.checkpoint() + +val checkpointRunnable = new Runnable { + override def run() = { +// Simply simulate what RDD.doCheckpoint() do here. +rdd.doCheckpointCalled = true +val checkpointData = rdd.checkpointData.get +RDDCheckpointData.synchronized { + if (checkpointData.cpState == CheckpointState.Initialized) { +checkpointData.cpState = CheckpointState.CheckpointingInProgress + } +} + +val newRDD = checkpointData.doCheckpoint() + +// Release semaphore1 after job triggered in checkpoint finished, so that taskBinary +// serialization can start. +semaphore1.release() +// Wait until taskBinary serialization finished in submitMissingTasksThread. +semaphore2.acquire() + +// Update our state and truncate the RDD lineage. +RDDCheckpointData.synchronized { + checkpointData.cpRDD = Some(newRDD) + checkpointData.cpState = CheckpointState.Checkpointed + rdd.markCheckpointed() +} +semaphore1.release() + } +} + +val submitMissingTasksRunnable = new Runnable { + override def run() = { +// Simply simulate the process of submitMissingTasks. +// Wait until doCheckpoint job running finished, but checkpoint status not changed. +semaphore1.acquire() + +val ser = SparkEnv.get.closureSerializer.newInstance() + +// Simply simulate task serialization while submitMissingTasks. +// Task serialized with rdd checkpoint not finished. +val cleanedFunc = sc.clean(Utils.getIteratorSize _) +val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it) +val taskBinaryBytes = JavaUtils.bufferToArray( + ser.serialize((rdd, func): AnyRef)) +// Because partition calculate is in a synchronized block, so in the fixed code +// partition is calculated here. +val correctPart = rdd.partitions(0) + +// Release semaphore2 so changing checkpoint status to Checkpointed will be done in +// checkpointThread. +semaphore2.release() +// Wait until checkpoint status changed to Checkpointed in checkpointThread. +semaphore1.acquire() + +// Part calculated with rdd checkpoint already finished. +val errPart = rdd.partitions(0) + +// TaskBinary will be deserialized when run task in executor. +val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) => Unit)]( + ByteBuffer.wrap(taskBinaryBytes), Thread.currentThread.getContextClassLoader) + +val taskContext = mock(classOf[TaskContext]) +doNothing().when(taskContext).killTaskIfInterrupted() + +// ClassCastException is expected with errPart. +intercept[ClassCastException] { + // Triggered when runTask in executor. + t
[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20244#discussion_r165763274 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * In this test, we simply simulate the scene in concurrent jobs using the same + * rdd which is marked to do checkpoint: + * Job one has already finished the spark job, and start the process of doCheckpoint; + * Job two is submitted, and submitMissingTasks is called. + * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done, + * while part calculates from stage.rdd.partitions is called after doCheckpoint is done, + * we may get a ClassCastException when execute the task because of some rdd will do + * Partition cast. + * + * With this test case, just want to indicate that we should do taskSerialization and + * part calculate in submitMissingTasks with the same rdd checkpoint status. + */ + test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") { +// set checkpointDir. +val tempDir = Utils.createTempDir() +val checkpointDir = File.createTempFile("temp", "", tempDir) +checkpointDir.delete() +sc.setCheckpointDir(checkpointDir.toString) + +// Semaphores to control the process sequence for the two threads below. +val semaphore1 = new Semaphore(0) +val semaphore2 = new Semaphore(0) + +val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4)) +rdd.checkpoint() + +val checkpointRunnable = new Runnable { + override def run() = { +// Simply simulate what RDD.doCheckpoint() do here. +rdd.doCheckpointCalled = true +val checkpointData = rdd.checkpointData.get +RDDCheckpointData.synchronized { + if (checkpointData.cpState == CheckpointState.Initialized) { +checkpointData.cpState = CheckpointState.CheckpointingInProgress + } +} + +val newRDD = checkpointData.doCheckpoint() + +// Release semaphore1 after job triggered in checkpoint finished, so that taskBinary +// serialization can start. +semaphore1.release() +// Wait until taskBinary serialization finished in submitMissingTasksThread. +semaphore2.acquire() + +// Update our state and truncate the RDD lineage. +RDDCheckpointData.synchronized { + checkpointData.cpRDD = Some(newRDD) + checkpointData.cpState = CheckpointState.Checkpointed + rdd.markCheckpointed() +} +semaphore1.release() --- End diff -- and then this would be another semaphore `checkpointStateUpdated` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20244#discussion_r165759800 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1016,15 +1016,23 @@ class DAGScheduler( // might modify state of objects referenced in their closures. This is necessary in Hadoop // where the JobConf/Configuration object is not thread-safe. var taskBinary: Broadcast[Array[Byte]] = null +var partitions: Array[Partition] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). - val taskBinaryBytes: Array[Byte] = stage match { -case stage: ShuffleMapStage => - JavaUtils.bufferToArray( -closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) -case stage: ResultStage => - JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) + var taskBinaryBytes: Array[Byte] = null + // Add synchronized block to avoid rdd deserialized from taskBinaryBytes has diff checkpoint + // status with the rdd when create ShuffleMapTask or ResultTask. --- End diff -- I'd reword this a bit: taskBinaryBytes and partitions are both effected by the checkpoint status. We need this synchronization in case another concurrent job is checkpointing this RDD, so we get a consistent view of both variables. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20462: [SPARK-23020][core] Fix another race in the in-process l...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20462 bq. Is there a release notes / ki kinda thing for Spark releases? not that I know of -- I was just thinking of putting it in the jira, I think that is the best things users have to search. I know its not great, but its something. The current bug description doesn't hint at this at all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20462: [SPARK-23020][core] Fix another race in the in-process l...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20462 lgtm The fix here makes sense to me, I see how it breaks the test. I'm just wondering, do we need to doc this at all for users, eg. just clearly describe it in jira? I realize most users will never hit this, as its only super short apps, but just say that its possible for very short apps, they never enter the FINISHED state but instead go to LOST, even though the app finished successfully? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20422: [SPARK-23253][Core][Shuffle]Only write shuffle temporary...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20422 merged to master. thanks @yaooqinn for doing and updating the tests too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data
Github user squito commented on the issue: https://github.com/apache/spark/pull/19041 Thought some more about the race between `RemoveBlock` getting sent back from the executor vs when the `CacheRecoveryManager` tries to replicate the next block -- actually why is there the back-and-forth with the driver for every block? Why isn't there just one message from the `CacheRecoveryManager` to the executor, saying "Drain all RDD blocks" and then one message from the executor back to the driver when its done? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r165482549 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -420,63 +432,53 @@ private[spark] class ExecutorAllocationManager( * Request the cluster manager to remove the given executors. * Returns the list of executors which are removed. */ - private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized { -val executorIdsToBeRemoved = new ArrayBuffer[String] - + private def removeExecutors(executors: Seq[String]): Unit = synchronized { --- End diff -- return in the doc is wrong --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r165488466 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def recoverLatestRDDBlock( + execId: String, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { +logDebug(s"Replicating first cached block on $execId") +val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) +val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + blocks = info.cachedBlocks.collect { case r: RDDBlockId => r } + // we assume blocks from the latest rdd are most relevant + firstBlock <- if (blocks.isEmpty) None else Some(blocks.max[RDDBlockId](Ordering.by(_.rddId))) + replicaSet <- blockLocations.asScala.get(firstBlock) + // Add 2 below because you need the number of replicas, plus one for the original, plus one + // for the new replica. + maxReps = replicaSet.size + 2 +} yield info.slaveEndpoint + .ask[Boolean](ReplicateBlock(firstBlock, replicaSet.toSeq, excluded, maxReps)) + .flatMap { success => +if (success) { + logTrace(s"Replicated block $firstBlock on executor $execId") + replicaSet -= blockManagerId + info.slaveEndpoint.ask[Boolean](RemoveBlock(firstBlock)) --- End diff -- if I understand right, in order for the next iteration to avoid trying to remove the same block over again, you need this call to update the `info.cachedBlocks` used above. But I think that update is async -- even after this future has completed, it doesn't mean `info.cachedBlocks` has been updated. I think what will happen in that case is, on the next time through, on the next iteration you'll try to remove the exact same block, which will fail on the executor because its already been removed, and then back on the driver you'll decide to stop trying to replicate the rest of the blocks because of this apparent failure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r165490041 --- Diff: core/src/test/scala/org/apache/spark/CacheRecoveryManagerSuite.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.{Future, Promise} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration +import scala.reflect.ClassTag + +import org.mockito.Mockito._ +import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually +import org.scalatest.mockito.MockitoSugar +import org.scalatest.time.{Millis, Span} + +import org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT +import org.apache.spark.rpc._ +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages._ +import org.apache.spark.util.ThreadUtils + +class CacheRecoveryManagerSuite + extends SparkFunSuite with MockitoSugar with Matchers with Eventually { + + val oneGB: Long = 1024L * 1024L * 1024L * 1024L + val plentyOfMem = Map( +BlockManagerId("1", "host", 12, None) -> ((oneGB, oneGB)), +BlockManagerId("2", "host", 12, None) -> ((oneGB, oneGB)), +BlockManagerId("3", "host", 12, None) -> ((oneGB, oneGB))) + + test("CacheRecoveryManager will replicate blocks until empty and then kill executor") { +val conf = new SparkConf() +val eam = mock[ExecutorAllocationManager] +val blocks = Seq(RDDBlockId(1, 1), RDDBlockId(2, 1)) +val bmme = FakeBMM(1, blocks.iterator, plentyOfMem) +val bmmeRef = DummyRef(bmme) +val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf) +when(eam.killExecutors(Seq("1"))).thenReturn(Seq("1")) +val result = cacheRecoveryManager.startCacheRecovery(Seq("1")) + +eventually { + verify(eam).killExecutors(Seq("1")) + bmme.replicated.get("1").get shouldBe 2 +} + +cleanup(result, cacheRecoveryManager) + } + + test("CacheRecoveryManager will kill executor if it takes too long to replicate") { +val conf = new SparkConf().set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "1s") +val eam = mock[ExecutorAllocationManager] +val blocks = Set(RDDBlockId(1, 1), RDDBlockId(2, 1), RDDBlockId(3, 1), RDDBlockId(4, 1)) +val bmme = FakeBMM(600, blocks.iterator, plentyOfMem) +val bmmeRef = DummyRef(bmme) +val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf) +val result = cacheRecoveryManager.startCacheRecovery(Seq("1")) + +eventually(timeout(Span(1010, Millis)), interval(Span(500, Millis))) { --- End diff -- Also you can skip the `pause` and instead tell your FakeBMM to just stop responding after N blocks -- eg., make it only respond to the first 2 calls to `RecoverLatestRDDBlock` and just ignore the rest. (You're still waiting on the kill timer, though) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r165486679 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def recoverLatestRDDBlock( + execId: String, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { +logDebug(s"Replicating first cached block on $execId") +val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) +val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + blocks = info.cachedBlocks.collect { case r: RDDBlockId => r } + // we assume blocks from the latest rdd are most relevant + firstBlock <- if (blocks.isEmpty) None else Some(blocks.max[RDDBlockId](Ordering.by(_.rddId))) --- End diff -- you can do `blocks.maxBy(_.rddId)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20474: [SPARK-23235][Core] Add executor Threaddump to api
Github user squito commented on the issue: https://github.com/apache/spark/pull/20474 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20474: [SPARK-23235][Core] Add executor Threaddump to api
Github user squito commented on the issue: https://github.com/apache/spark/pull/20474 Jenkins, add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20474: [SPARK-23235][Core] Add executor Threaddump to ap...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20474#discussion_r165379694 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala --- @@ -51,6 +52,21 @@ private[v1] class AbstractApplicationResource extends BaseAppResource { @Path("executors") def executorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(true)) + @GET + @Path("executors/{executorId}/threads") + def threadDump(@PathParam("executorId") executorId: String): + Option[Array[ThreadStackTrace]] = withUI { ui => +val safeExecutorId = + Option(UIUtils.stripXSS(executorId)).map { executorId => +UIUtils.decodeURLParameter(executorId) + }.getOrElse { +throw new IllegalArgumentException(s"Missing executorId parameter") + } +ui.sc.flatMap { sc => + sc.getExecutorThreadDump(safeExecutorId) --- End diff -- what happens if you give a bad executor Id? Looks like you'll just return an empty response, but I think an error might be more appropriate? In fact would be nice if the error distinguished between a totally bogus executorId vs. an executorId which is dead vs. calling this on the history server, where its totally unavailable. Here's an example of that kind of error handling for the opposite case, where the endpoint is *only* available on the history server: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala?utf8=%E2%9C%93#L90-L95 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20474: [SPARK-23235][Core] Add executor Threaddump to ap...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20474#discussion_r165377861 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala --- @@ -51,6 +52,21 @@ private[v1] class AbstractApplicationResource extends BaseAppResource { @Path("executors") def executorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(true)) + @GET + @Path("executors/{executorId}/threads") + def threadDump(@PathParam("executorId") executorId: String): + Option[Array[ThreadStackTrace]] = withUI { ui => --- End diff -- if this doesn't fit on oneline, the style is to still put each param on its own line ```scala def threadDump( @PathParam("executorId") executorId: String): Option[Array[ThreadStackTrace]] = ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20422#discussion_r165256701 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala --- @@ -89,26 +96,39 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa } { out2.close() } -resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2) +resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths2, dataTmp2) + +assert(indexFile.length() === (lengths.length + 1) * 8) assert(lengths2.toSeq === lengths.toSeq) assert(dataFile.exists()) assert(dataFile.length() === 30) assert(!dataTmp2.exists()) // The dataFile should be the previous one val firstByte = new Array[Byte](1) -val in = new FileInputStream(dataFile) +val dataIn = new FileInputStream(dataFile) Utils.tryWithSafeFinally { - in.read(firstByte) + dataIn.read(firstByte) } { - in.close() + dataIn.close() } assert(firstByte(0) === 0) +// The index file should not change +val secondValueOffset = new Array[Byte](8) +val indexIn = new FileInputStream(indexFile) +Utils.tryWithSafeFinally { + indexIn.read(secondValueOffset) + indexIn.read(secondValueOffset) +} { + indexIn.close() +} +assert(secondValueOffset(7) === 10, "The index file should not change") --- End diff -- minor: here and below, would be more clear if you use `DataInputStream.readLong()` (no magic 7 offset, and you check the rest of the bytes): ```scala val indexIn = new DataInputStream( newFileInputStream(indexFile)) Utils.tryWithSafeFinally { indexIn.readLong() // first offset is always 0 assert(10 === indexIn.readLong(),"The index file should not change") } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r165259010 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def recoverLatestRDDBlock( + execId: String, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { +logDebug(s"Replicating first cached block on $execId") +val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) +val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + blocks = info.cachedBlocks.collect { case r: RDDBlockId => r } --- End diff -- in one comment, you mention that you are only replicating in-memory rdd blocks. But this will also replicate on-disk rdd blocks, I think. do you want to replicate both? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20424: [Spark-23240][python] Better error message when e...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20424#discussion_r165255299 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -191,7 +191,20 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String daemon = pb.start() val in = new DataInputStream(daemon.getInputStream) -daemonPort = in.readInt() +try { + daemonPort = in.readInt() +} catch { + case exc: EOFException => +throw new IOException(s"No port number in $daemonModule's stdout") +} + +// test that the returned port number is within a valid range. +// note: this does not cover the case where the port number +// is arbitrary data but is also coincidentally within range +if (daemonPort < 1 || daemonPort > 0x) { --- End diff -- ah I see, I think you are worried about something other than what bruce and I thought. Your concern is that we might throw an exception for some values that are actually perfectly legitimate. Port 0 being special is a pretty standard thing -- its mentioned in the constructor for ServerSocket: https://docs.oracle.com/javase/7/docs/api/java/net/ServerSocket.html#ServerSocket%28int%29 which implies that you shouldn't ever open a Socket on port 0, though I don't see that officially documented. At least on my laptop, I get different errors if I try to connect to port 0, vs. just connecting to a bogus port: ```scala scala> val s2 = new Socket("localhost", 1234) java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at java.net.Socket.connect(Socket.java:538) at java.net.Socket.(Socket.java:434) at java.net.Socket.(Socket.java:211) ... 29 elided scala> val s3 = new Socket("localhost", 0) java.net.NoRouteToHostException: Can't assign requested address at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at java.net.Socket.connect(Socket.java:538) at java.net.Socket.(Socket.java:434) at java.net.Socket.(Socket.java:211) ... 29 elided ``` so I think its pretty safe to say that daemon.py (or whatever) shouldn't be passing back `0` as the port to bind to. Still -- it is *clearly* safer to instead have the port written to some other file, or (another) socket, so that you we wouldn't have to worry about the details of this error handling. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20461: [SPARK-23289][CORE]OneForOneBlockFetcher.DownloadCallbac...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20461 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r165214410 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java --- @@ -126,4 +150,38 @@ private void failRemainingBlocks(String[] failedBlockIds, Throwable e) { } } } + + private class DownloadCallback implements StreamCallback { + +private WritableByteChannel channel = null; +private File targetFile = null; +private int chunkIndex; + +public DownloadCallback(File targetFile, int chunkIndex) throws IOException { + this.targetFile = targetFile; + this.channel = Channels.newChannel(new FileOutputStream(targetFile)); + this.chunkIndex = chunkIndex; +} + +@Override +public void onData(String streamId, ByteBuffer buf) throws IOException { + channel.write(buf); --- End diff -- right, I realize there isn't a simple one-line change here to switch to using spliceTo, I was wondering what the behavior is. I actually thought zero-copy and offheap were orthogonal -- anytime netty gives you direct access to bytes, it has to be copied to user space, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20461: [SPARK-23289][CORE]OneForOneBlockFetcher.Download...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20461#discussion_r165213755 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java --- @@ -171,7 +171,9 @@ private void failRemainingBlocks(String[] failedBlockIds, Throwable e) { @Override public void onData(String streamId, ByteBuffer buf) throws IOException { - channel.write(buf); + while (buf.hasRemaining()) { +channel.write(buf); --- End diff -- I actually thought this is OK for a FileChannel (just based on this comment in the original: https://github.com/apache/spark/pull/16989/files#r115409001) but certainly this seems like a safe change. Just wondering if you observed an issue, or this is just to be safe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r165207476 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def recoverLatestRDDBlock( + execId: String, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { +logDebug(s"Replicating first cached block on $execId") +val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) +val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + blocks = info.cachedBlocks.collect { case r: RDDBlockId => r } + // we assume blocks from the latest rdd are most relevant --- End diff -- On first read I thought you were _only_ replicating the latest rdd. Maybe expand this to As a heuristic, prioritize replicating the latest rdd. If this succeeds, CacheRecoveryManager will try to replicate the remaining rdds. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r165207798 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def recoverLatestRDDBlock( + execId: String, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { +logDebug(s"Replicating first cached block on $execId") +val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) +val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + blocks = info.cachedBlocks.collect { case r: RDDBlockId => r } + // we assume blocks from the latest rdd are most relevant + firstBlock <- if (blocks.isEmpty) None else Some(blocks.max[RDDBlockId](Ordering.by(_.rddId))) + replicaSet <- blockLocations.asScala.get(firstBlock) + // Add 2 below because you need the number of replicas, plus one for the original, plus one + // for the new replica. + maxReps = replicaSet.size + 2 --- End diff -- doesn't blockLocations already include the original? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r165210078 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Success => Succ} +import scala.util.Failure + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.{BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages._ +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executor's cached blocks, and then shutting + * it down. + */ +final private class CacheRecoveryManager( +blockManagerMasterEndpoint: RpcEndpointRef, +executorAllocationManager: ExecutorAllocationManager, +conf: SparkConf) + extends Logging { + + private val forceKillAfterS = conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT) + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool") + private implicit val asyncExecutionContext: ExecutionContext = +ExecutionContext.fromExecutorService(threadPool) + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers") + private val recoveringExecutors: mutable.Set[String] = +ConcurrentHashMap.newKeySet[String]().asScala + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + */ + def startCacheRecovery(execIds: Seq[String]): Unit = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +val canBeRecovered = checkMem(execIds) +recoveringExecutors ++= canBeRecovered +val executorsWithKillTimers = canBeRecovered.zip(canBeRecovered.map(startKillTimer)) +executorsWithKillTimers.foreach((replicateUntilDone _).tupled) + } + + /** + * Given a list of executors that will be shut down, check if there is enough free memory on the + * rest of the cluster to hold their data. Return a list of just the executors for which there + * will be enough space. Executors are included smallest first. + * + * @param execIds executors which will be shut down + * @return a Seq of the executors we do have room for + */ + private def checkMem(execIds: Seq[String]): Seq[String] = { +val execsToShutDown = execIds.toSet +// Memory Status is a map of executor Id to a tuple of Max Memory and remaining memory on that +// executor. +val allExecMemStatus: Map[String, (Long, Long)] = blockManagerMasterEndpoint + .askSync[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) + .map { case (blockManagerId, mem) => blockManagerId.executorId -> mem } + +val (expiringMemStatus, remainingMemStatus) = allExecMemStatus.partition { + case (execId, _) => execsToShutDown.contains(execId) +} +val freeMemOnRemaining = remainingMemStatus.values.map(_._2).sum + +// The used mem on each executor sorted from least used mem to greatest +val executorAndUsedMem: Seq[(String, Long)] = + expiringMemStatus.map { case (execId, (maxMem, remainingMem)) => +val usedMem = maxMem - remainingMem +execId -> usedMem + }.toSeq.sortBy { case (_, usedMem) => usedMem } + +executorAndUsedMem + .scan((
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r165178844 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java --- @@ -126,4 +150,38 @@ private void failRemainingBlocks(String[] failedBlockIds, Throwable e) { } } } + + private class DownloadCallback implements StreamCallback { + +private WritableByteChannel channel = null; +private File targetFile = null; +private int chunkIndex; + +public DownloadCallback(File targetFile, int chunkIndex) throws IOException { + this.targetFile = targetFile; + this.channel = Channels.newChannel(new FileOutputStream(targetFile)); + this.chunkIndex = chunkIndex; +} + +@Override +public void onData(String streamId, ByteBuffer buf) throws IOException { + channel.write(buf); --- End diff -- I am super-late on reviewing this, apologies, just asking questions for my own understanding, and to consider possible future improvements -- this won't do a zero-copy transfer, will it? That ByteBuffer is still in user space? From my understanding, we'd need to do special handling to use netty's `spliceTo` when possible: https://stackoverflow.com/questions/30322957/is-there-transferfrom-like-functionality-in-netty-for-zero-copy but I'm still working on putting all the pieces together here and admittedly this is out of my area of expertise --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20422#discussion_r165161260 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala --- @@ -133,4 +133,65 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa } assert(firstByte2(0) === 2) } + + test("SPARK-23253: index files should be created properly") { --- End diff -- thanks for adding this, but actually I'm not sure this is covering any cases in the previous test, is it? I was thinking of just adding something to read the actual index file, and make sure it had the right values to go with the update to the data file (or no updates in some cases). you may have added a couple more asserts than the original test -- if so, maybe they can just be added to the original? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20422#discussion_r165159188 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala --- @@ -133,4 +133,65 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa } assert(firstByte2(0) === 2) } + + test("SPARK-23253: index files should be created properly") { +val shuffleId = 1 +val mapId = 2 +val idxName = s"shuffle_${shuffleId}_${mapId}_0.index" +val resolver = new IndexShuffleBlockResolver(conf, blockManager) + +val lengths = (1 to 2).map(_ => 8L).toArray --- End diff -- you could do `Array.fill(2)(8L)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20422: [SPARK-23253][Core][Shuffle]Only write shuffle temporary...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20422 @jerryshao are you ok with making this change? I think our original comments corssed paths as I was taking a closer look --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20424: [Spark-23240][python] Better error message when e...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20424#discussion_r165116866 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -191,7 +191,20 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String daemon = pb.start() val in = new DataInputStream(daemon.getInputStream) -daemonPort = in.readInt() +try { + daemonPort = in.readInt() +} catch { + case exc: EOFException => +throw new IOException(s"No port number in $daemonModule's stdout") +} + +// test that the returned port number is within a valid range. +// note: this does not cover the case where the port number +// is arbitrary data but is also coincidentally within range +if (daemonPort < 1 || daemonPort > 0x) { --- End diff -- yeah I could go either way on this. Personally i think just the enhanced error message you have above would be useful, without going through the added trouble of using another approach. I'll defer to your opinion @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20408#discussion_r164874798 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -423,7 +425,8 @@ private class LiveStage extends LiveEntity { newAccumulatorInfos(info.accumulables.values), None, None, - killedSummary) + killedSummary, + blackListedExecutors) --- End diff -- oh hmm, I had actually just meant storing this as a variable in `LiveStage`, but not including it in `v1.StageData`. Do you think its useful to have this in api itself? opinions @tgravescs ? its already present in a per-executor flag, but there is not summary for the entire stage, to give a list of the blacklisted executors. Just having it in the `LiveStage` is enough to get the performance improvement I had mentioned earlier. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20408#discussion_r164886701 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -228,9 +230,12 @@ private[spark] class AppStatusListener( // Implicitly blacklist every available executor for the stage associated with this node Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage => - liveExecutors.values.filter(_.host == event.hostId).foreach { exec => -setStageBlackListStatus(stage, exec.executorId, now) + val executorIds = liveExecutors.values.filter(_.host == event.hostId).map(_.executorId) + executorIds.foreach { executorId => +setStageBlackListStatus(stage, executorId, now) } + stage.blackListedExecutors ++= executorIds + maybeUpdate(stage, now) --- End diff -- if `setStageBlacklistStatus` took a Seq or varags for multiple executorIds, you could push the these other updates down into it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20408#discussion_r164824439 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -594,12 +606,24 @@ private[spark] class AppStatusListener( stage.executorSummaries.values.foreach(update(_, now)) update(stage, now, last = true) + + val executorIdsForStage = stage.executorSummaries.keySet + executorIdsForStage.foreach { executorId => +liveExecutors.get(executorId).foreach { exec => + removeBlackListedStageFrom(exec, event.stageInfo.stageId, now) +} + } } appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1) kvstore.write(appSummary) } + private def removeBlackListedStageFrom(exec: LiveExecutor, stageId: Int, now: Long) = { +exec.blacklistedInStages -= stageId +liveUpdate(exec, now) --- End diff -- hmm actually I just thought of something else. It looks like you're calling `liveUpdate` here for *every* executor when the stage finishes. Say you have 1000 execs, a very quick stage, and no blacklisting, this is an expensive update for no actual change. So you should at least avoid the `liveUpdate` if `exec.blacklistedInStages` hasn't changed at all. But really, I think that `LiveStage` should maintain a set of blacklisted executors, so you avoid calling this entirely for execs which aren't blacklisted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20408#discussion_r164774722 --- Diff: core/src/main/resources/org/apache/spark/ui/static/executorspage.js --- @@ -416,8 +422,7 @@ $(document).ready(function () { }, {data: 'hostPort'}, {data: 'isActive', render: function (data, type, row) { -if (row.isBlacklisted) return "Blacklisted"; -else return formatStatus (data, type); +return formatStatus (data, type, row); --- End diff -- ok nevermind -- now I see the old code didn't seem to follow the convention in the rest of this file. Not sure about the style guide for js, but I think this might be a bit cleaner if we put each element on its own line, so the nesting is cleaner, like it is here: https://github.com/apache/spark/blob/master/core/src/main/resources/org/apache/spark/ui/static/executorspage.js#L459-L476 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20408#discussion_r164766057 --- Diff: core/src/main/resources/org/apache/spark/ui/static/executorspage.js --- @@ -416,8 +422,7 @@ $(document).ready(function () { }, {data: 'hostPort'}, {data: 'isActive', render: function (data, type, row) { -if (row.isBlacklisted) return "Blacklisted"; -else return formatStatus (data, type); +return formatStatus (data, type, row); --- End diff -- weird indentation here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20424: [Spark-23240][python] Better error message when e...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20424#discussion_r164765247 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -191,7 +191,20 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String daemon = pb.start() val in = new DataInputStream(daemon.getInputStream) -daemonPort = in.readInt() +try { + daemonPort = in.readInt() +} catch { + case exc: EOFException => +throw new IOException(s"No port number in $daemonModule's stdout") +} + +// test that the returned port number is within a valid range. +// note: this does not cover the case where the port number +// is arbitrary data but is also coincidentally within range +if (daemonPort < 1 || daemonPort > 0x) { --- End diff -- yeah this is kind of what I was getting at below -- what value are we adding with this extra handling, over the original exception? Another possibility is to change this to not use stdout, but that adds more complexity. You could use sockets, or right the port to some dedicated temporary file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20424: [Spark-23240][python] Better error message when e...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20424#discussion_r164637553 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -191,7 +191,20 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String daemon = pb.start() val in = new DataInputStream(daemon.getInputStream) -daemonPort = in.readInt() +try { + daemonPort = in.readInt() +} catch { + case exc: EOFException => +throw new IOException(s"No port number in $daemonModule's stdout") +} + +// test that the returned port number is within a valid range. +// note: this does not cover the case where the port number +// is arbitrary data but is also coincidentally within range +if (daemonPort < 1 || daemonPort > 0x) { + throw new IOException(s"Bad port number in $daemonModule's stdout: " + +f"0x$daemonPort%08x") --- End diff -- just a thought: this error message won't mean much to the typical user. Would it be sensible to tell the user exactly what python command to run themselves to figure out the problem? Eg. "unexpected stdout from /foo/bar/some/path/to/python -m /path/to/daemon.py". That's what would help with that sitecustomization.py case. Or not useful in general? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20422#discussion_r164635886 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -166,8 +153,20 @@ private[spark] class IndexShuffleBlockResolver( if (dataTmp != null && dataTmp.exists()) { dataTmp.delete() } - indexTmp.delete() } else { + val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) --- End diff -- move this below the comment "This is the first successul attempt". I'd also include a comment about why we write to a temporary file, even though we're always going to rename (because in case the task dies somehow, we'd prefer to not leave a half-written index file in the final location). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20422: [SPARK-23253][Core][Shuffle]Only write shuffle temporary...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20422 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20422: [SPARK-23253][Core][Shuffle]Only write shuffle temporary...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20422 thanks for taking a look at this @yaooqinn . To clarify -- there is no bug you are trying to fix here, is there? Its just an optimization? From a quick glance I think the change seems correct ... but also seems like such a minor improvement that I'm not sure I see the value in changing this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20408#discussion_r164627546 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -254,6 +255,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE var totalShuffleRead = 0L var totalShuffleWrite = 0L var isBlacklisted = false + var blacklistedInStages: Set[Int] = TreeSet() --- End diff -- yeah thats a good reason, sorry I should have thought of that! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20408: [SPARK-23189][Core][Web UI] Reflect stage level blacklis...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20408 code looks good to me, but lets what @tgravescs @ajbozarth say. @ajbozarth it is wordy, but I think `Active (Blacklisted in Stages: [...])` is probably the best of the options so far. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20408#discussion_r164584678 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -594,12 +606,24 @@ private[spark] class AppStatusListener( stage.executorSummaries.values.foreach(update(_, now)) update(stage, now, last = true) + + val executorIdsForStage = stage.executorSummaries.keySet + executorIdsForStage.foreach { executorId => +liveExecutors.get(executorId).foreach { exec => + removeBlackListedStageFrom(exec, event.stageInfo.stageId, now) --- End diff -- whoops, my fault! thanks for explaining --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20408#discussion_r164588595 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -254,6 +255,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE var totalShuffleRead = 0L var totalShuffleWrite = 0L var isBlacklisted = false + var blacklistedInStages: Set[Int] = TreeSet() --- End diff -- any particular reason you chose `TreeSet`, and not just `scala.collection.immutable.Set` (scala has a default implementation, specialized for a small number of items, then goes to a `HashSet`) -- see use of `Map` as implementation for `executorLogs`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20161: [SPARK-21525][streaming] Check error code from superviso...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20161 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20399: [SPARK-23209][core] Allow credential manager to work whe...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20399 I am merging this now to master & 2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20399: [SPARK-23209][core] Allow credential manager to w...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20399#discussion_r164223721 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -75,6 +75,17 @@ private[spark] class HadoopDelegationTokenManager( .toMap } + private def safeCreateProvider( + createFn: => HadoopDelegationTokenProvider): Option[HadoopDelegationTokenProvider] = { +try { + Some(createFn) +} catch { + case t: Throwable => +logDebug(s"Failed to load built in provider.", t) --- End diff -- I think debug is right, actually -- we have no idea at this point if the user wants these credential providers, and it could be totally fine if they're missing eg. if they never want to talk to hive. (also don't really care that much and don't want to bike-shed on this ...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20408#discussion_r164177919 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -594,12 +606,24 @@ private[spark] class AppStatusListener( stage.executorSummaries.values.foreach(update(_, now)) update(stage, now, last = true) + + val executorIdsForStage = stage.executorSummaries.keySet + executorIdsForStage.foreach { executorId => +liveExecutors.get(executorId).foreach { exec => + removeBlackListedStageFrom(exec, event.stageInfo.stageId, now) --- End diff -- I'm just doing a really quick scan here, but I don't understand why changes here are necessary. You don't get these events for blacklisting within a stage. or is this a bug in the current code, and something we should fix in the pending 2.3 release? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20408: [SPARK-23189][Core][Web UI] Reflect stage level b...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20408#discussion_r164175040 --- Diff: core/src/main/resources/org/apache/spark/ui/static/executorspage.js --- @@ -25,9 +25,13 @@ function getThreadDumpEnabled() { return threadDumpEnabled; } -function formatStatus(status, type) { +function formatStatus(status, type, row) { --- End diff -- can you move the ```js if (row.isBlacklisted) return "Blacklisted"; ``` into this function as well? no reason for it to be separated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20399: [SPARK-23209][core] Allow credential manager to w...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20399#discussion_r163989326 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala --- @@ -110,7 +111,64 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { creds.getAllTokens.size should be (0) } + test("SPARK-23209: obtain tokens when Hive classes are not available") { +// This test needs a custom class loader to hide Hive classes which are in the classpath. +// Because the manager code loads the Hive provider directly instead of using reflection, we +// need to drive the test through the custom class loader so a new copy that cannot find +// Hive classes is loaded. +val currentLoader = Thread.currentThread().getContextClassLoader() +val noHive = new ClassLoader() { + override def loadClass(name: String, resolve: Boolean): Class[_] = { +if (name.startsWith("org.apache.hive") || name.startsWith("org.apache.hadoop.hive")) { + throw new ClassNotFoundException(name) +} + +if (name.startsWith("java") || name.startsWith("scala")) { + currentLoader.loadClass(name) +} else { + val classFileName = name.replaceAll("\\.", "/") + ".class" + val in = currentLoader.getResourceAsStream(classFileName) + if (in != null) { +val bytes = IOUtils.toByteArray(in) +defineClass(name, bytes, 0, bytes.length) + } else { +throw new ClassNotFoundException(name) + } +} + } +} + +try { + Thread.currentThread().setContextClassLoader(noHive) + val test = noHive.loadClass(NoHiveTest.getClass.getName().stripSuffix("$")) + test.getMethod("main", classOf[Array[String]]).invoke(null, Array[String]()) +} finally { + Thread.currentThread().setContextClassLoader(currentLoader) +} + } + private[spark] def hadoopFSsToAccess(hadoopConf: Configuration): Set[FileSystem] = { Set(FileSystem.get(hadoopConf)) } } + +/** Test code for SPARK-23209 to avoid using too much reflection above. */ +private object NoHiveTest extends Matchers { + + def main(args: Array[String]): Unit = { --- End diff -- super minor: can you name this something other than "main"? it makes it seem like you're launching it as seperate process (maybe leftover from earlier attempt?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20399: [SPARK-23209][core] Allow credential manager to w...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20399#discussion_r163980133 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala --- @@ -110,7 +111,64 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { creds.getAllTokens.size should be (0) } + test("SPARK-23209: obtain tokens when Hive classes are not available") { +// This test needs a custom class loader to hide Hive classes which are in the classpath. +// Because the manager code loads the Hive provider directly instead of using reflection, we --- End diff -- just for my understandning, is there any reason the manager should load the code directly, rather than using reflection to guard against this? I guess either way is fine, I just had seen us use reflection more to guard against this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20203 merged to master. thanks @attilapiros --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20203 @attilapiros can you please update the PR description to also mention node blacklisting (at least briefly) and file a jira for the followup work, and ping me & tom on it? lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20203 I tried this out on a cluster and seemed fine. I also tried with a bad app, where all tasks fail, its not just because of the host, and all the executors show up as blacklisted, but I guess that is fine. ```scala sc.parallelize(1 to 1000, 10).map {x => throw new RuntimeException("bad application")}.count() ``` https://user-images.githubusercontent.com/71240/35247305-88bceb4c-ff8f-11e7-9e98-285ffd604f46.png;> or about the same if there is just one bad task: ```scala sc.parallelize(1 to 1000, 10).map {x => if (TaskContext.get().partitionId() == 0) throw new RuntimeException("bad task") else 0 }.count() ``` of course if you have more executors, then a whole bunch of them show up as blacklisted, but still seems fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19992: [SPARK-22805][CORE] Use StorageLevel aliases in event lo...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19992 thanks for looking into this @superbobry -- can you actually close this yourself? we can't directly close it (there is a way but its more complicated) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20203 @attilapiros test failures look real (you probably just need to regenerate some of those expectations). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20203 btw another way you could test out having a bad host would be something like this (untested): ```scala import org.apache.spark.SparkEnv val hosts = sc.parallelize(1 to 1, 100).map { _ => InetAddress.getHostName()}.collect().toSet val badHost = hosts.head sc.parallelize(1 to 1, 10).map { x => if (InetAddress.getHostName() == badHost) throw new RuntimeException("Bad host") else (x % 3, x) }.reduceByKey((a, b) => a + b).collect() ``` that way you make sure the failures are consistently on one host, not dependent on higher executor ids getting concentrated on one host. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r162714257 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala --- @@ -59,31 +60,55 @@ class TaskSetBlacklistSuite extends SparkFunSuite with BeforeAndAfterEach with M val shouldBeBlacklisted = (executor == "exec1" && index == 0) assert(taskSetBlacklist.isExecutorBlacklistedForTask(executor, index) === shouldBeBlacklisted) } + assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerExecutorBlacklistedForStage])) + assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerNodeBlacklistedForStage])) // Mark task 1 failed on exec1 -- this pushes the executor into the blacklist taskSetBlacklist.updateBlacklistForFailedTask( "hostA", exec = "exec1", index = 1, failureReason = "testing") + assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1")) -assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) verify(listenerBusMock).post( SparkListenerExecutorBlacklistedForStage(0, "exec1", 2, 0, attemptId)) + +assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerNodeBlacklistedForStage])) + // Mark one task as failed on exec2 -- not enough for any further blacklisting yet. taskSetBlacklist.updateBlacklistForFailedTask( "hostA", exec = "exec2", index = 0, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1")) + assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2")) +verify(listenerBusMock, never()).post( + SparkListenerNodeBlacklistedForStage(0, "hostA", 2, 0, attemptId)) + assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) +verify(listenerBusMock, never()) + .post(isA(classOf[SparkListenerNodeBlacklistedForStage])) --- End diff -- the `verify` you add just above this is pointless with this one too, right? I think you only need this one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20284: [SPARK-23103][core] Ensure correct sort order for negati...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20284 even though we don't *know* of this causing a bug in 2.3, I still think we should merge it in there just because there may be some case we aren't thinking of, and this is a relatively small, safe fix. so, I'm merging to master & 2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20138: [SPARK-20664][core] Delete stale application data from S...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20138 as RC1 failed and RC2 is going to be cut soon, I'm going to merge this to master & 2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20319: [SPARK-22884][ML][TESTS] ML test for StructuredStreaming...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20319 Jenkins, add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20284: [SPARK-23103][core] Ensure correct sort order for...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20284#discussion_r162166090 --- Diff: common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java --- @@ -112,7 +114,8 @@ public void setup() throws Exception { t.key = "key" + i; t.id = "id" + i; t.name = "name" + RND.nextInt(MAX_ENTRIES); - t.num = RND.nextInt(MAX_ENTRIES); + // Force one item to have an integer value of zero to test the fix for SPARK-23103. + t.num = (i != 0) ? (int) RND.nextLong() : 0; --- End diff -- why the chang from `RND.nextInt(MAX_ENTRIES)`? this seems fine, just seemed like you were more likely to stress collision on this index before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20203 that sounds fine with me --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20203 yeah I think its probably fine to update the executors page that way -- lets at least see how it looks. Would the list include all stages ever blacklisted? Only those stages still running? The most recent 3 blacklisted stages? @attilapiros how about you take a shot at updating the executors page as well, we can see what that looks like? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r162087946 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala --- @@ -128,13 +130,17 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, } // Check if enough tasks have failed on the executor to blacklist it for the entire stage. -if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) { +val numFailures = execFailures.numUniqueTasksWithFailures +if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) { if (blacklistedExecs.add(exec)) { logInfo(s"Blacklisting executor ${exec} for stage $stageId") // This executor has been pushed into the blacklist for this stage. Let's check if it // pushes the whole node into the blacklist. val blacklistedExecutorsOnNode = execsWithFailuresOnNode.filter(blacklistedExecs.contains(_)) +val now = clock.getTimeMillis() +listenerBus.post( + SparkListenerExecutorBlacklistedForStage(now, exec, numFailures, stageId, stageAttemptId)) if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) { if (blacklistedNodes.add(host)) { logInfo(s"Blacklisting ${host} for stage $stageId") --- End diff -- yes that makes sense to me -- totally agree with your point about handling late updates. After all, another executor can get added to the node at any time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20203 attila attached some screenshots to the jira (showing both what happens with stage blacklisting and full application blacklisting). The only change here is to the page for a specific stage, so it seems clear that its saying the executor is blacklisted for the same stage. OTOH, if we were to change the executors page as well, then you would need to put something to indicate which page ... I'm not sure what you would put, to be both useful and succinct. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20138: [SPARK-20664][core] Delete stale application data from S...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20138 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20203: [SPARK-22577] [core] executor page blacklist status shou...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20203 @ajbozarth maybe you have some thoughts on the UI, and whether it makes sense to put anything on the executors page? @CodingCat you also often have good UI suggestions :) thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161884194 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala --- @@ -36,7 +36,9 @@ import org.apache.spark.util.Clock * [[TaskSetManager]] this class is designed only to be called from code with a lock on the * TaskScheduler (e.g. its event handlers). It should not be called from other threads. */ -private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, val clock: Clock) +private[scheduler] class TaskSetBlacklist(private val listenerBus: LiveListenerBus, + val conf: SparkConf, val stageId: Int, --- End diff -- style: if its multiline, each param on its own line, double-indented --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161885726 --- Diff: core/src/test/resources/HistoryServerExpectations/stage_blacklisting_for_stage_expectation.json --- @@ -0,0 +1,639 @@ +{ --- End diff -- nit: "stage" twice in the filename is confusing, how about just "blacklisting_for_stage_expectation.json" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161884916 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala --- @@ -128,13 +130,17 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, } // Check if enough tasks have failed on the executor to blacklist it for the entire stage. -if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) { +val numFailures = execFailures.numUniqueTasksWithFailures +if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) { if (blacklistedExecs.add(exec)) { logInfo(s"Blacklisting executor ${exec} for stage $stageId") // This executor has been pushed into the blacklist for this stage. Let's check if it // pushes the whole node into the blacklist. val blacklistedExecutorsOnNode = execsWithFailuresOnNode.filter(blacklistedExecs.contains(_)) +val now = clock.getTimeMillis() +listenerBus.post( + SparkListenerExecutorBlacklistedForStage(now, exec, numFailures, stageId, stageAttemptId)) if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) { if (blacklistedNodes.add(host)) { logInfo(s"Blacklisting ${host} for stage $stageId") --- End diff -- if we're going to do this for executors, we should do it for nodes too. In the UI, you'd just show for each executor that it was blacklisted for the stage, I dont think you would need to distinguish whether it was blacklisted b/c of the entire node, or just the one executor was blacklisted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161885207 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -211,6 +211,11 @@ private[spark] class AppStatusListener( updateBlackListStatus(event.executorId, true) } + override def onExecutorBlacklistedForStage( +event: SparkListenerExecutorBlacklistedForStage): Unit = { --- End diff -- double-indent this line (4 spaces) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment
Github user squito commented on the issue: https://github.com/apache/spark/pull/20236 @vanzin @jerryshao want to take another look? Now it * filters out "apache spark" * lets you enter an arbitrary id * if there's an error, just prompts again sample session: https://gist.github.com/squito/de73fbd0b9c00961377068b91283e04c --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20138#discussion_r161660926 --- Diff: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala --- @@ -663,6 +665,95 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc freshUI.get.ui.store.job(0) } + test("clean up stale app information") { +val storeDir = Utils.createTempDir() +val conf = createTestConf().set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) +val provider = spy(new FsHistoryProvider(conf)) +val appId = "new1" + +// Write logs for two app attempts. +doReturn(1L).when(provider).getNewLastScanTime() +val attempt1 = newLogFile(appId, Some("1"), inProgress = false) +writeFile(attempt1, true, None, + SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("1")), + SparkListenerJobStart(0, 1L, Nil, null), + SparkListenerApplicationEnd(5L) + ) +val attempt2 = newLogFile(appId, Some("2"), inProgress = false) +writeFile(attempt2, true, None, + SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("2")), + SparkListenerJobStart(0, 1L, Nil, null), + SparkListenerApplicationEnd(5L) + ) +updateAndCheck(provider) { list => + assert(list.size === 1) + assert(list(0).id === appId) + assert(list(0).attempts.size === 2) +} + +// Load the app's UI. +val ui = provider.getAppUI(appId, Some("1")) +assert(ui.isDefined) + +// Delete the underlying log file for attempt 1 and rescan. The UI should go away, but since +// attempt 2 still exists, listing data should be there. +doReturn(2L).when(provider).getNewLastScanTime() +attempt1.delete() +updateAndCheck(provider) { list => + assert(list.size === 1) + assert(list(0).id === appId) + assert(list(0).attempts.size === 1) +} +assert(!ui.get.valid) +assert(provider.getAppUI(appId, None) === None) + +// Delete the second attempt's log file. Now everything should go away. +doReturn(3L).when(provider).getNewLastScanTime() +attempt2.delete() +updateAndCheck(provider) { list => + assert(list.isEmpty) +} + } + + test("SPARK-21571: clean up removes invalid history files") { +val clock = new ManualClock(TimeUnit.DAYS.toMillis(120)) +val conf = createTestConf().set("spark.history.fs.cleaner.maxAge", s"2d") +val provider = new FsHistoryProvider(conf, clock) { + override def getNewLastScanTime(): Long = clock.getTimeMillis() +} + +// Create 0-byte size inprogress and complete files +val logfile1 = newLogFile("emptyInprogressLogFile", None, inProgress = true) +logfile1.createNewFile() +logfile1.setLastModified(clock.getTimeMillis()) + +val logfile2 = newLogFile("emptyFinishedLogFile", None, inProgress = false) +logfile2.createNewFile() +logfile2.setLastModified(clock.getTimeMillis()) + +// Create an incomplete log file, has an end record but no start record. +val logfile3 = newLogFile("nonEmptyCorruptLogFile", None, inProgress = false) +writeFile(logfile3, true, None, SparkListenerApplicationEnd(0)) +logfile3.setLastModified(clock.getTimeMillis()) + +provider.checkForLogs() +provider.cleanLogs() +assert(new File(testDir.toURI).listFiles().size === 3) + +// Move the clock forward 1 day and scan the files again. They should still be there. +clock.advance(TimeUnit.DAYS.toMillis(1)) +provider.checkForLogs() +provider.cleanLogs() +assert(new File(testDir.toURI).listFiles().size === 3) + +// Move the clock forward another 2 days and scan the files again. This time the cleaner should +// pick up the invalid files and get rid of them. +clock.advance(TimeUnit.DAYS.toMillis(2)) +provider.checkForLogs() +provider.cleanLogs() +assert(new File(testDir.toURI).listFiles().size === 0) --- End diff -- I think you should add a case where one file starts out empty, say even for one full day, but then becomes valid before the expiration time, and make sure it does *not* get cleaned up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20056: [SPARK-22878] [CORE] Count totalDroppedEvents for LiveLi...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20056 I see that `LiveListenerBus.droppedEventsCounter` and `lastReportTimestamp` are unused, so it certainly makes sense to clean them up one way or the other -- but that might mean we should delete them, not that we necessarily need to do something else with them. I could see an argument that there are already monitoring systems hooked up to the old metric, ["numEventsDropped"](https://github.com/apache/spark/blob/718bbc939037929ef5b8f4b4fe10aadfbab4408e/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L266), so maybe we should bring back the total with that metric. But do you really want even more logging of the total, beyond the logging from each queue? Seems like it would only be more confusing to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20138#discussion_r161099778 --- Diff: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala --- @@ -663,6 +665,95 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc freshUI.get.ui.store.job(0) } + test("clean up stale app information") { +val storeDir = Utils.createTempDir() +val conf = createTestConf().set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) +val provider = spy(new FsHistoryProvider(conf)) +val appId = "new1" + +// Write logs for two app attempts. +doReturn(1L).when(provider).getNewLastScanTime() +val attempt1 = newLogFile(appId, Some("1"), inProgress = false) +writeFile(attempt1, true, None, + SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("1")), + SparkListenerJobStart(0, 1L, Nil, null), + SparkListenerApplicationEnd(5L) + ) +val attempt2 = newLogFile(appId, Some("2"), inProgress = false) +writeFile(attempt2, true, None, + SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("2")), + SparkListenerJobStart(0, 1L, Nil, null), + SparkListenerApplicationEnd(5L) + ) +updateAndCheck(provider) { list => + assert(list.size === 1) + assert(list(0).id === appId) + assert(list(0).attempts.size === 2) +} + +// Load the app's UI. +val ui = provider.getAppUI(appId, Some("1")) +assert(ui.isDefined) + +// Delete the underlying log file for attempt 1 and rescan. The UI should go away, but since +// attempt 2 still exists, listing data should be there. +doReturn(2L).when(provider).getNewLastScanTime() +attempt1.delete() +updateAndCheck(provider) { list => + assert(list.size === 1) + assert(list(0).id === appId) + assert(list(0).attempts.size === 1) +} +assert(!ui.get.valid) +assert(provider.getAppUI(appId, None) === None) + +// Delete the second attempt's log file. Now everything should go away. +doReturn(3L).when(provider).getNewLastScanTime() +attempt2.delete() +updateAndCheck(provider) { list => + assert(list.isEmpty) +} + } + + test("SPARK-21571: clean up removes invalid history files") { +val clock = new ManualClock(TimeUnit.DAYS.toMillis(120)) --- End diff -- just curious, why start at 120 days? (not that it matters ...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20138#discussion_r161099310 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -834,6 +906,9 @@ private[history] case class FsHistoryProviderMetadata( private[history] case class LogInfo( @KVIndexParam logPath: String, +@KVIndexParam("lastProcessed") lastProcessed: Long, +appId: Option[String], --- End diff -- also a comment here explaining why appId is an Option, as that is unexpected --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20138#discussion_r161098356 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -544,73 +621,75 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) bus.addListener(listener) replay(fileStatus, bus, eventsFilter = eventsFilter) -listener.applicationInfo.foreach { app => - // Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a - // discussion on the UI lifecycle. - synchronized { -activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach { ui => - ui.invalidate() - ui.ui.store.close() +val (appId, attemptId) = listener.applicationInfo match { + case Some(app) => +// Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a +// discussion on the UI lifecycle. +synchronized { + activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach { ui => +ui.invalidate() +ui.ui.store.close() + } } - } - addListing(app) +addListing(app) +(Some(app.info.id), app.attempts.head.info.attemptId) + + case _ => +(None, None) --- End diff -- I think comment here explaining that writing an entry with no appId will mark this log file as eligible for automatic recovery, if its still in that state after max_log_age. (if I understood correctly) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20138: [SPARK-20664][core] Delete stale application data from S...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20138 well, perhaps I mis-represented this -- you still need to turn the event log cleaning on explicitly with the old option, "spark.history.fs.cleaner.enabled". This just doesn't include the "aggressive" option that was originally proposed by @ericvandenbergfb --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20138#discussion_r161095561 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val newLastScanTime = getNewLastScanTime() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") - // scan for modified applications, replay and merge them - val logInfos = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) + + val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) .filter { entry => !entry.isDirectory() && // FsHistoryProvider generates a hidden file which can't be read. Accidentally // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && -SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) && -recordedFileSize(entry.getPath()) < entry.getLen() +SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) +} +.filter { entry => + try { +val info = listing.read(classOf[LogInfo], entry.getPath().toString()) +if (info.fileSize < entry.getLen()) { + // Log size has changed, it should be parsed. + true +} else { + // If the SHS view has a valid application, update the time the file was last seen so + // that the entry is not deleted from the SHS listing. + if (info.appId.isDefined) { +listing.write(info.copy(lastProcessed = newLastScanTime)) + } + false +} + } catch { +case _: NoSuchElementException => + // If the file is currently not being tracked by the SHS, add an entry for it and try + // to parse it. This will allow the cleaner code to detect the file as stale later on + // if it was not possible to parse it. + listing.write(LogInfo(entry.getPath().toString(), newLastScanTime, None, None, +entry.getLen())) + entry.getLen() > 0 + } } .sortWith { case (entry1, entry2) => entry1.getModificationTime() > entry2.getModificationTime() } - if (logInfos.nonEmpty) { -logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}") + if (updated.nonEmpty) { +logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}") } - var tasks = mutable.ListBuffer[Future[_]]() - - try { -for (file <- logInfos) { - tasks += replayExecutor.submit(new Runnable { -override def run(): Unit = mergeApplicationListing(file) + val tasks = updated.map { entry => +try { + replayExecutor.submit(new Runnable { +override def run(): Unit = mergeApplicationListing(entry, newLastScanTime) }) +} catch { + // let the iteration over logInfos break, since an exception on --- End diff -- and actually you've moved the try/catch so this is no longer true, you'll continue to submit all tasks if one throws an exception. (I guess I'm not really sure why the old code did it that way ...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20138#discussion_r161082423 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val newLastScanTime = getNewLastScanTime() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") - // scan for modified applications, replay and merge them - val logInfos = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) + + val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) .filter { entry => !entry.isDirectory() && // FsHistoryProvider generates a hidden file which can't be read. Accidentally // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && -SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) && -recordedFileSize(entry.getPath()) < entry.getLen() +SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) +} +.filter { entry => + try { +val info = listing.read(classOf[LogInfo], entry.getPath().toString()) +if (info.fileSize < entry.getLen()) { + // Log size has changed, it should be parsed. + true +} else { + // If the SHS view has a valid application, update the time the file was last seen so + // that the entry is not deleted from the SHS listing. + if (info.appId.isDefined) { +listing.write(info.copy(lastProcessed = newLastScanTime)) + } + false +} + } catch { +case _: NoSuchElementException => + // If the file is currently not being tracked by the SHS, add an entry for it and try + // to parse it. This will allow the cleaner code to detect the file as stale later on + // if it was not possible to parse it. + listing.write(LogInfo(entry.getPath().toString(), newLastScanTime, None, None, +entry.getLen())) + entry.getLen() > 0 + } } .sortWith { case (entry1, entry2) => entry1.getModificationTime() > entry2.getModificationTime() } - if (logInfos.nonEmpty) { -logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}") + if (updated.nonEmpty) { +logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}") } - var tasks = mutable.ListBuffer[Future[_]]() - - try { -for (file <- logInfos) { - tasks += replayExecutor.submit(new Runnable { -override def run(): Unit = mergeApplicationListing(file) + val tasks = updated.map { entry => +try { + replayExecutor.submit(new Runnable { +override def run(): Unit = mergeApplicationListing(entry, newLastScanTime) }) +} catch { + // let the iteration over logInfos break, since an exception on --- End diff -- you've renamed `logInfos` to `updated` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment
Github user squito commented on the issue: https://github.com/apache/spark/pull/20236 @vanzin what exactly are you looking for? The one thing which would be easy is letting you write in an arbitrary jira id (no name searching or anything), that sound OK? I guess this bug isn't really a major issue so no urgency in getting this in, so I can add to this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment
Github user squito commented on the issue: https://github.com/apache/spark/pull/20236 @jerryshao this should fix it, but I don't have anything to merge to test this out -- would appreciate if someone could try it before we merge. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20236: [SPARK-23044] Error handling for jira assignment
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/20236 [SPARK-23044] Error handling for jira assignment ## What changes were proposed in this pull request? In case the selected user isn't a contributor yet, or any other unexpected error, just don't assign the jira. ## How was this patch tested? Couldn't really test the error case, just some testing of similar-ish code in python shell. Haven't run a merge yet. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-23044 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20236.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20236 commit 8c4cc61c2a06b310480fafb3b28067a6f961816a Author: Imran Rashid <irashid@...> Date: 2018-01-11T15:42:16Z [SPARK-23044] Error handling for jira assignment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20056: [SPARK-22878] [CORE] Count totalDroppedEvents for LiveLi...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20056 I have the same question as @jiangxb1987 , what is the situation where you'd use this metric? the jira doesn't say either. seems like existing metrics mostly cover this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20013: [SPARK-20657][core] Speed up rendering of the stages pag...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20013 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20013: [SPARK-20657][core] Speed up rendering of the sta...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20013#discussion_r160017859 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -119,118 +121,115 @@ private class LiveTask( import LiveEntityHelpers._ - private var recordedMetrics: v1.TaskMetrics = null + private var metrics: MetricsTracker = new MetricsTracker() var errorMessage: Option[String] = None /** * Update the metrics for the task and return the difference between the previous and new * values. */ - def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = { + def updateMetrics(metrics: TaskMetrics): MetricsTracker = { if (metrics != null) { - val old = recordedMetrics - recordedMetrics = new v1.TaskMetrics( -metrics.executorDeserializeTime, -metrics.executorDeserializeCpuTime, -metrics.executorRunTime, -metrics.executorCpuTime, -metrics.resultSize, -metrics.jvmGCTime, -metrics.resultSerializationTime, -metrics.memoryBytesSpilled, -metrics.diskBytesSpilled, -metrics.peakExecutionMemory, -new v1.InputMetrics( - metrics.inputMetrics.bytesRead, - metrics.inputMetrics.recordsRead), -new v1.OutputMetrics( - metrics.outputMetrics.bytesWritten, - metrics.outputMetrics.recordsWritten), -new v1.ShuffleReadMetrics( - metrics.shuffleReadMetrics.remoteBlocksFetched, - metrics.shuffleReadMetrics.localBlocksFetched, - metrics.shuffleReadMetrics.fetchWaitTime, - metrics.shuffleReadMetrics.remoteBytesRead, - metrics.shuffleReadMetrics.remoteBytesReadToDisk, - metrics.shuffleReadMetrics.localBytesRead, - metrics.shuffleReadMetrics.recordsRead), -new v1.ShuffleWriteMetrics( - metrics.shuffleWriteMetrics.bytesWritten, - metrics.shuffleWriteMetrics.writeTime, - metrics.shuffleWriteMetrics.recordsWritten)) - if (old != null) calculateMetricsDelta(recordedMetrics, old) else recordedMetrics + val old = this.metrics + val newMetrics = new MetricsTracker() + newMetrics.executorDeserializeTime = metrics.executorDeserializeTime + newMetrics.executorDeserializeCpuTime = metrics.executorDeserializeCpuTime + newMetrics.executorRunTime = metrics.executorRunTime + newMetrics.executorCpuTime = metrics.executorCpuTime + newMetrics.resultSize = metrics.resultSize + newMetrics.jvmGcTime = metrics.jvmGCTime + newMetrics.resultSerializationTime = metrics.resultSerializationTime + newMetrics.memoryBytesSpilled = metrics.memoryBytesSpilled + newMetrics.diskBytesSpilled = metrics.diskBytesSpilled + newMetrics.peakExecutionMemory = metrics.peakExecutionMemory + newMetrics.inputBytesRead = metrics.inputMetrics.bytesRead + newMetrics.inputRecordsRead = metrics.inputMetrics.recordsRead + newMetrics.outputBytesWritten = metrics.outputMetrics.bytesWritten + newMetrics.outputRecordsWritten = metrics.outputMetrics.recordsWritten + newMetrics.shuffleRemoteBlocksFetched = metrics.shuffleReadMetrics.remoteBlocksFetched + newMetrics.shuffleLocalBlocksFetched = metrics.shuffleReadMetrics.localBlocksFetched + newMetrics.shuffleFetchWaitTime = metrics.shuffleReadMetrics.fetchWaitTime + newMetrics.shuffleRemoteBytesRead = metrics.shuffleReadMetrics.remoteBytesRead + newMetrics.shuffleRemoteBytesReadToDisk = metrics.shuffleReadMetrics.remoteBytesReadToDisk + newMetrics.shuffleLocalBytesRead = metrics.shuffleReadMetrics.localBytesRead + newMetrics.shuffleRecordsRead = metrics.shuffleReadMetrics.recordsRead + newMetrics.shuffleBytesWritten = metrics.shuffleWriteMetrics.bytesWritten + newMetrics.shuffleWriteTime = metrics.shuffleWriteMetrics.writeTime + newMetrics.shuffleRecordsWritten = metrics.shuffleWriteMetrics.recordsWritten + + this.metrics = newMetrics + if (old.executorDeserializeTime >= 0L) { +old.subtract(newMetrics) +old + } else { +newMetrics + } } else { null } } - /** - * Return a new TaskMetrics object containing the delta of the various fields of the given - * metrics objects. This is currently targeted at updating stage data, so it does not - * necessarily calculate deltas for all the fields. - */ - private def calculateMetricsDelta( - metrics: v1.TaskMetrics, - old: v1.TaskMetrics): v1.TaskMetrics = { -
[GitHub] spark pull request #20013: [SPARK-20657][core] Speed up rendering of the sta...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20013#discussion_r159988995 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -110,107 +114,240 @@ private[spark] class AppStatusStore( if (details) stageWithDetails(stage) else stage } + def taskCount(stageId: Int, stageAttemptId: Int): Long = { +store.count(classOf[TaskDataWrapper], "stage", Array(stageId, stageAttemptId)) + } + + def localitySummary(stageId: Int, stageAttemptId: Int): Map[String, Long] = { +store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality + } + + /** + * Calculates a summary of the task metrics for the given stage attempt, returning the + * requested quantiles for the recorded metrics. + * + * This method can be expensive if the requested quantiles are not cached; the method + * will only cache certain quantiles (every 0.05 step), so it's recommended to stick to + * those to avoid expensive scans of all task data. + */ def taskSummary( stageId: Int, stageAttemptId: Int, - quantiles: Array[Double]): v1.TaskMetricDistributions = { - -val stage = Array(stageId, stageAttemptId) - -val rawMetrics = store.view(classOf[TaskDataWrapper]) - .index("stage") - .first(stage) - .last(stage) - .asScala - .flatMap(_.info.taskMetrics) - .toList - .view - -def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] = - Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles) - -// We need to do a lot of similar munging to nested metrics here. For each one, -// we want (a) extract the values for nested metrics (b) make a distribution for each metric -// (c) shove the distribution into the right field in our return type and (d) only return -// a result if the option is defined for any of the tasks. MetricHelper is a little util -// to make it a little easier to deal w/ all of the nested options. Mostly it lets us just -// implement one "build" method, which just builds the quantiles for each field. - -val inputMetrics = - new MetricHelper[v1.InputMetrics, v1.InputMetricDistributions](rawMetrics, quantiles) { -def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = raw.inputMetrics - -def build: v1.InputMetricDistributions = new v1.InputMetricDistributions( - bytesRead = submetricQuantiles(_.bytesRead), - recordsRead = submetricQuantiles(_.recordsRead) -) - }.build - -val outputMetrics = - new MetricHelper[v1.OutputMetrics, v1.OutputMetricDistributions](rawMetrics, quantiles) { -def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = raw.outputMetrics - -def build: v1.OutputMetricDistributions = new v1.OutputMetricDistributions( - bytesWritten = submetricQuantiles(_.bytesWritten), - recordsWritten = submetricQuantiles(_.recordsWritten) -) - }.build - -val shuffleReadMetrics = - new MetricHelper[v1.ShuffleReadMetrics, v1.ShuffleReadMetricDistributions](rawMetrics, -quantiles) { -def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics = - raw.shuffleReadMetrics - -def build: v1.ShuffleReadMetricDistributions = new v1.ShuffleReadMetricDistributions( - readBytes = submetricQuantiles { s => s.localBytesRead + s.remoteBytesRead }, - readRecords = submetricQuantiles(_.recordsRead), - remoteBytesRead = submetricQuantiles(_.remoteBytesRead), - remoteBytesReadToDisk = submetricQuantiles(_.remoteBytesReadToDisk), - remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched), - localBlocksFetched = submetricQuantiles(_.localBlocksFetched), - totalBlocksFetched = submetricQuantiles { s => -s.localBlocksFetched + s.remoteBlocksFetched - }, - fetchWaitTime = submetricQuantiles(_.fetchWaitTime) -) - }.build - -val shuffleWriteMetrics = - new MetricHelper[v1.ShuffleWriteMetrics, v1.ShuffleWriteMetricDistributions](rawMetrics, -quantiles) { -def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics = - raw.shuffleWriteMetrics - -def build: v1.ShuffleWriteMetricDistributions = new v1.ShuffleWriteMetricDistributions( - writeBytes = submetricQuantiles(_.bytesWritten), - writeRecords = submetricQuantiles(_.recordsWritten),
[GitHub] spark pull request #20013: [SPARK-20657][core] Speed up rendering of the sta...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20013#discussion_r159992789 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -110,107 +114,240 @@ private[spark] class AppStatusStore( if (details) stageWithDetails(stage) else stage } + def taskCount(stageId: Int, stageAttemptId: Int): Long = { +store.count(classOf[TaskDataWrapper], "stage", Array(stageId, stageAttemptId)) + } + + def localitySummary(stageId: Int, stageAttemptId: Int): Map[String, Long] = { +store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality + } + + /** + * Calculates a summary of the task metrics for the given stage attempt, returning the + * requested quantiles for the recorded metrics. + * + * This method can be expensive if the requested quantiles are not cached; the method + * will only cache certain quantiles (every 0.05 step), so it's recommended to stick to + * those to avoid expensive scans of all task data. + */ def taskSummary( stageId: Int, stageAttemptId: Int, - quantiles: Array[Double]): v1.TaskMetricDistributions = { - -val stage = Array(stageId, stageAttemptId) - -val rawMetrics = store.view(classOf[TaskDataWrapper]) - .index("stage") - .first(stage) - .last(stage) - .asScala - .flatMap(_.info.taskMetrics) - .toList - .view - -def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] = - Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles) - -// We need to do a lot of similar munging to nested metrics here. For each one, -// we want (a) extract the values for nested metrics (b) make a distribution for each metric -// (c) shove the distribution into the right field in our return type and (d) only return -// a result if the option is defined for any of the tasks. MetricHelper is a little util -// to make it a little easier to deal w/ all of the nested options. Mostly it lets us just -// implement one "build" method, which just builds the quantiles for each field. - -val inputMetrics = - new MetricHelper[v1.InputMetrics, v1.InputMetricDistributions](rawMetrics, quantiles) { -def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = raw.inputMetrics - -def build: v1.InputMetricDistributions = new v1.InputMetricDistributions( - bytesRead = submetricQuantiles(_.bytesRead), - recordsRead = submetricQuantiles(_.recordsRead) -) - }.build - -val outputMetrics = - new MetricHelper[v1.OutputMetrics, v1.OutputMetricDistributions](rawMetrics, quantiles) { -def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = raw.outputMetrics - -def build: v1.OutputMetricDistributions = new v1.OutputMetricDistributions( - bytesWritten = submetricQuantiles(_.bytesWritten), - recordsWritten = submetricQuantiles(_.recordsWritten) -) - }.build - -val shuffleReadMetrics = - new MetricHelper[v1.ShuffleReadMetrics, v1.ShuffleReadMetricDistributions](rawMetrics, -quantiles) { -def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics = - raw.shuffleReadMetrics - -def build: v1.ShuffleReadMetricDistributions = new v1.ShuffleReadMetricDistributions( - readBytes = submetricQuantiles { s => s.localBytesRead + s.remoteBytesRead }, - readRecords = submetricQuantiles(_.recordsRead), - remoteBytesRead = submetricQuantiles(_.remoteBytesRead), - remoteBytesReadToDisk = submetricQuantiles(_.remoteBytesReadToDisk), - remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched), - localBlocksFetched = submetricQuantiles(_.localBlocksFetched), - totalBlocksFetched = submetricQuantiles { s => -s.localBlocksFetched + s.remoteBlocksFetched - }, - fetchWaitTime = submetricQuantiles(_.fetchWaitTime) -) - }.build - -val shuffleWriteMetrics = - new MetricHelper[v1.ShuffleWriteMetrics, v1.ShuffleWriteMetricDistributions](rawMetrics, -quantiles) { -def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics = - raw.shuffleWriteMetrics - -def build: v1.ShuffleWriteMetricDistributions = new v1.ShuffleWriteMetricDistributions( - writeBytes = submetricQuantiles(_.bytesWritten), - writeRecords = submetricQuantiles(_.recordsWritten),
[GitHub] spark pull request #20013: [SPARK-20657][core] Speed up rendering of the sta...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20013#discussion_r159990658 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -110,107 +114,240 @@ private[spark] class AppStatusStore( if (details) stageWithDetails(stage) else stage } + def taskCount(stageId: Int, stageAttemptId: Int): Long = { +store.count(classOf[TaskDataWrapper], "stage", Array(stageId, stageAttemptId)) + } + + def localitySummary(stageId: Int, stageAttemptId: Int): Map[String, Long] = { +store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality + } + + /** + * Calculates a summary of the task metrics for the given stage attempt, returning the + * requested quantiles for the recorded metrics. + * + * This method can be expensive if the requested quantiles are not cached; the method + * will only cache certain quantiles (every 0.05 step), so it's recommended to stick to + * those to avoid expensive scans of all task data. + */ def taskSummary( stageId: Int, stageAttemptId: Int, - quantiles: Array[Double]): v1.TaskMetricDistributions = { - -val stage = Array(stageId, stageAttemptId) - -val rawMetrics = store.view(classOf[TaskDataWrapper]) - .index("stage") - .first(stage) - .last(stage) - .asScala - .flatMap(_.info.taskMetrics) - .toList - .view - -def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] = - Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles) - -// We need to do a lot of similar munging to nested metrics here. For each one, -// we want (a) extract the values for nested metrics (b) make a distribution for each metric -// (c) shove the distribution into the right field in our return type and (d) only return -// a result if the option is defined for any of the tasks. MetricHelper is a little util -// to make it a little easier to deal w/ all of the nested options. Mostly it lets us just -// implement one "build" method, which just builds the quantiles for each field. - -val inputMetrics = - new MetricHelper[v1.InputMetrics, v1.InputMetricDistributions](rawMetrics, quantiles) { -def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = raw.inputMetrics - -def build: v1.InputMetricDistributions = new v1.InputMetricDistributions( - bytesRead = submetricQuantiles(_.bytesRead), - recordsRead = submetricQuantiles(_.recordsRead) -) - }.build - -val outputMetrics = - new MetricHelper[v1.OutputMetrics, v1.OutputMetricDistributions](rawMetrics, quantiles) { -def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = raw.outputMetrics - -def build: v1.OutputMetricDistributions = new v1.OutputMetricDistributions( - bytesWritten = submetricQuantiles(_.bytesWritten), - recordsWritten = submetricQuantiles(_.recordsWritten) -) - }.build - -val shuffleReadMetrics = - new MetricHelper[v1.ShuffleReadMetrics, v1.ShuffleReadMetricDistributions](rawMetrics, -quantiles) { -def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics = - raw.shuffleReadMetrics - -def build: v1.ShuffleReadMetricDistributions = new v1.ShuffleReadMetricDistributions( - readBytes = submetricQuantiles { s => s.localBytesRead + s.remoteBytesRead }, - readRecords = submetricQuantiles(_.recordsRead), - remoteBytesRead = submetricQuantiles(_.remoteBytesRead), - remoteBytesReadToDisk = submetricQuantiles(_.remoteBytesReadToDisk), - remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched), - localBlocksFetched = submetricQuantiles(_.localBlocksFetched), - totalBlocksFetched = submetricQuantiles { s => -s.localBlocksFetched + s.remoteBlocksFetched - }, - fetchWaitTime = submetricQuantiles(_.fetchWaitTime) -) - }.build - -val shuffleWriteMetrics = - new MetricHelper[v1.ShuffleWriteMetrics, v1.ShuffleWriteMetricDistributions](rawMetrics, -quantiles) { -def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics = - raw.shuffleWriteMetrics - -def build: v1.ShuffleWriteMetricDistributions = new v1.ShuffleWriteMetricDistributions( - writeBytes = submetricQuantiles(_.bytesWritten), - writeRecords = submetricQuantiles(_.recordsWritten),
[GitHub] spark issue #20082: [SPARK-22897][CORE]: Expose stageAttemptId in TaskContex...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20082 sorry this is a little late, but lgtm too. agree with the points above about leaving the old name deprecated and moving to the new name --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19992: [SPARK-22805][CORE] Use StorageLevel aliases in event lo...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19992 change is fine, but from discussion on the jira I'm unclear if this is really worth it -- gain seems pretty small after the other fix in 2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19992: [SPARK-22805][CORE] Use StorageLevel aliases in e...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19992#discussion_r159812291 --- Diff: core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala --- @@ -2022,12 +1947,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Port": 300 |}, |"Block ID": "rdd_0_0", - |"Storage Level": { --- End diff -- yup, I completely agree that off heap is not respected in the json format. can you file a bug? I think its still relevant even after this goes in, for custom levels --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19848 @steveloughran can you bring this up on dev@? we should move this discussion off of this PR. (sorry haven't had a chance to look yet, but I appreciate you doing this) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20039: [SPARK-22850][core] Ensure queued events are delivered t...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20039 merged to master / 2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20117: [SPARK-22921][PROJECT-INFRA] Bug fix in jira assigning
Github user squito commented on the issue: https://github.com/apache/spark/pull/20117 merged to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org