[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r115220025 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -163,6 +173,8 @@ final class ShuffleBlockFetcherIterator( case _ => } } +freeMemory(getUsed) --- End diff -- Yes, I should refine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r115219258 --- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala --- @@ -128,41 +130,52 @@ private[spark] class CompressedMapStatus( * @param numNonEmptyBlocks the number of non-empty blocks * @param emptyBlocks a bitmap tracking which blocks are empty * @param avgSize average size of the non-empty blocks + * @param hugeBlockSizesArray sizes of huge blocks by their reduceId. */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, -private[this] var avgSize: Long) +private[this] var avgSize: Long, +private[this] var hugeBlockSizesArray: Array[Tuple2[Int, Byte]]) extends MapStatus with Externalizable { + @transient var hugeBlockSizes: Map[Int, Byte] = +if (hugeBlockSizesArray == null) null else hugeBlockSizesArray.toMap + // loc could be null when the default constructor is called during deserialization require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1) // For deserialization only + def this() = this(null, -1, null, -1, null) // For deserialization only override def location: BlockManagerId = loc override def getSizeForBlock(reduceId: Int): Long = { if (emptyBlocks.contains(reduceId)) { 0 } else { - avgSize + hugeBlockSizes.get(reduceId) match { +case Some(size) => MapStatus.decompressSize(size) +case None => avgSize + } } } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) emptyBlocks.writeExternal(out) out.writeLong(avgSize) +out.writeObject(hugeBlockSizesArray) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) avgSize = in.readLong() +hugeBlockSizesArray = in.readObject().asInstanceOf[Array[Tuple2[Int, Byte]]] +hugeBlockSizes = hugeBlockSizesArray.toMap --- End diff -- Yes, I think so. For example in "MapStatusSuite: HighlyCompressedMapStatus: estimated size should be the average non-empty block size". If I remove this line, after `compressAndDecompressMapStatus`, `hugeBlockSizes` is initialized to be null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16989 @cloud-fan More comments on this ? :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r115021013 --- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala --- @@ -128,41 +130,52 @@ private[spark] class CompressedMapStatus( * @param numNonEmptyBlocks the number of non-empty blocks * @param emptyBlocks a bitmap tracking which blocks are empty * @param avgSize average size of the non-empty blocks + * @param hugeBlockSizesArray sizes of huge blocks by their reduceId. */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, -private[this] var avgSize: Long) +private[this] var avgSize: Long, +private[this] var hugeBlockSizesArray: Array[Tuple2[Int, Byte]]) extends MapStatus with Externalizable { + @transient var hugeBlockSizes: Map[Int, Byte] = +if (hugeBlockSizesArray == null) null else hugeBlockSizesArray.toMap + // loc could be null when the default constructor is called during deserialization require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1) // For deserialization only + def this() = this(null, -1, null, -1, null) // For deserialization only override def location: BlockManagerId = loc override def getSizeForBlock(reduceId: Int): Long = { if (emptyBlocks.contains(reduceId)) { 0 } else { - avgSize + hugeBlockSizes.get(reduceId) match { +case Some(size) => MapStatus.decompressSize(size) +case None => avgSize + } } } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) emptyBlocks.writeExternal(out) out.writeLong(avgSize) +out.writeObject(hugeBlockSizesArray) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) avgSize = in.readLong() +hugeBlockSizesArray = in.readObject().asInstanceOf[Array[Tuple2[Int, Byte]]] +hugeBlockSizes = hugeBlockSizesArray.toMap --- End diff -- @cloud-fan >After the constructor is called, both hugeBlockSizes and hugeBlockSizesArray are initialized to be null, I think we need to initialize both in readExternal when deserialize. I still think I should keep this as it is here. Am I wrong? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r115018506 --- Diff: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala --- @@ -128,4 +130,23 @@ class MapStatusSuite extends SparkFunSuite { assert(size1 === size2) assert(!success) } + + test("Blocks which are bigger than 2 * average size should not be underestimated.") { +val sizes = Array.concat(Array.fill[Long](1000)(1L), (1000L to 2000L).toArray) +val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes) +val arrayStream = new ByteArrayOutputStream(102400) +val objectOutputStream = new ObjectOutputStream(arrayStream) +assert(status1.isInstanceOf[HighlyCompressedMapStatus]) + status1.asInstanceOf[HighlyCompressedMapStatus].writeExternal(objectOutputStream) --- End diff -- Yes, I should refine this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16989 @cloud-fan Really really thankful for reviewing this pr:). I've refined according to your comments. Please take another look at this when you have time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114967741 --- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala --- @@ -128,41 +130,52 @@ private[spark] class CompressedMapStatus( * @param numNonEmptyBlocks the number of non-empty blocks * @param emptyBlocks a bitmap tracking which blocks are empty * @param avgSize average size of the non-empty blocks + * @param hugeBlockSizesArray sizes of huge blocks by their reduceId. */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, -private[this] var avgSize: Long) +private[this] var avgSize: Long, +private[this] var hugeBlockSizesArray: Array[Tuple2[Int, Byte]]) extends MapStatus with Externalizable { + @transient var hugeBlockSizes: Map[Int, Byte] = +if (hugeBlockSizesArray == null) null else hugeBlockSizesArray.toMap + // loc could be null when the default constructor is called during deserialization require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1) // For deserialization only + def this() = this(null, -1, null, -1, null) // For deserialization only override def location: BlockManagerId = loc override def getSizeForBlock(reduceId: Int): Long = { if (emptyBlocks.contains(reduceId)) { 0 } else { - avgSize + hugeBlockSizes.get(reduceId) match { +case Some(size) => MapStatus.decompressSize(size) +case None => avgSize + } } } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) emptyBlocks.writeExternal(out) out.writeLong(avgSize) +out.writeObject(hugeBlockSizesArray) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) avgSize = in.readLong() +hugeBlockSizesArray = in.readObject().asInstanceOf[Array[Tuple2[Int, Byte]]] +hugeBlockSizes = hugeBlockSizesArray.toMap --- End diff -- I cannot pass "MapStatusSuite: HighlyCompressedMapStatus: estimated size should be the average non-empty block size" if remove this line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114965988 --- Diff: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala --- @@ -401,4 +424,74 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT assert(id3 === ShuffleBlockId(0, 2, 0)) } + test("Blocks should be shuffled to disk when size is above the threshold, otherwise to memory.") { +val blockManager = mock(classOf[BlockManager]) +val localBmId = BlockManagerId("test-client", "test-client", 1) +doReturn(localBmId).when(blockManager).blockManagerId + +val diskBlockManager = mock(classOf[DiskBlockManager]) +doReturn(new File("shuffle-read-file")).when(diskBlockManager).getFile(any(classOf[String])) +doReturn(diskBlockManager).when(blockManager).diskBlockManager + +val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) +val remoteBlocks = Map[BlockId, ManagedBuffer]( + ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer(), + ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer(), + ShuffleBlockId(0, 2, 0) -> createMockManagedBuffer()) +val transfer = mock(classOf[BlockTransferService]) +var shuffleFilesOpt: Option[Array[File]] = None +when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())) + .thenAnswer(new Answer[Unit] { +override def answer(invocation: InvocationOnMock): Unit = { + val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] + shuffleFilesOpt = invocation.getArguments()(5).asInstanceOf[Option[Array[File]]] + Future { +listener.onBlockFetchSuccess( + ShuffleBlockId(0, 0, 0).toString, remoteBlocks(ShuffleBlockId(0, 0, 0))) +listener.onBlockFetchSuccess( + ShuffleBlockId(0, 1, 0).toString, remoteBlocks(ShuffleBlockId(0, 1, 0))) +listener.onBlockFetchSuccess( + ShuffleBlockId(0, 2, 0).toString, remoteBlocks(ShuffleBlockId(0, 2, 0))) + } +} + }) +val taskMemoryManager = mock(classOf[TaskMemoryManager]) +when(taskMemoryManager.acquireExecutionMemory(any(), any())) + .thenAnswer(new Answer[Long] { +override def answer(invocationOnMock: InvocationOnMock): Long = 500L + }) + +val tc = new TaskContextImpl(0, 0, 0, 0, taskMemoryManager, new Properties, null) + +val blocksByAddress1 = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( + (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 100.asInstanceOf[Long])).toSeq)) + +val iterator1 = new ShuffleBlockFetcherIterator( + tc, + transfer, + blockManager, + blocksByAddress1, + (_, in) => in, + 48 * 1024 * 1024, + Int.MaxValue, + true, + taskMemoryManager) + +assert(shuffleFilesOpt.isEmpty) +val blocksByAddress2 = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( + (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 200.asInstanceOf[Long])).toSeq) --- End diff -- oh... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114965950 --- Diff: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala --- @@ -401,4 +424,74 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT assert(id3 === ShuffleBlockId(0, 2, 0)) } + test("Blocks should be shuffled to disk when size is above the threshold, otherwise to memory.") { +val blockManager = mock(classOf[BlockManager]) +val localBmId = BlockManagerId("test-client", "test-client", 1) +doReturn(localBmId).when(blockManager).blockManagerId + +val diskBlockManager = mock(classOf[DiskBlockManager]) +doReturn(new File("shuffle-read-file")).when(diskBlockManager).getFile(any(classOf[String])) +doReturn(diskBlockManager).when(blockManager).diskBlockManager + +val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) +val remoteBlocks = Map[BlockId, ManagedBuffer]( + ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer(), + ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer(), + ShuffleBlockId(0, 2, 0) -> createMockManagedBuffer()) +val transfer = mock(classOf[BlockTransferService]) +var shuffleFilesOpt: Option[Array[File]] = None +when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())) + .thenAnswer(new Answer[Unit] { +override def answer(invocation: InvocationOnMock): Unit = { + val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] + shuffleFilesOpt = invocation.getArguments()(5).asInstanceOf[Option[Array[File]]] + Future { +listener.onBlockFetchSuccess( + ShuffleBlockId(0, 0, 0).toString, remoteBlocks(ShuffleBlockId(0, 0, 0))) +listener.onBlockFetchSuccess( + ShuffleBlockId(0, 1, 0).toString, remoteBlocks(ShuffleBlockId(0, 1, 0))) +listener.onBlockFetchSuccess( + ShuffleBlockId(0, 2, 0).toString, remoteBlocks(ShuffleBlockId(0, 2, 0))) + } +} + }) +val taskMemoryManager = mock(classOf[TaskMemoryManager]) +when(taskMemoryManager.acquireExecutionMemory(any(), any())) + .thenAnswer(new Answer[Long] { +override def answer(invocationOnMock: InvocationOnMock): Long = 500L + }) + +val tc = new TaskContextImpl(0, 0, 0, 0, taskMemoryManager, new Properties, null) + +val blocksByAddress1 = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( + (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 100.asInstanceOf[Long])).toSeq)) --- End diff -- My mistake --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114965477 --- Diff: core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala --- @@ -126,11 +131,21 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext .set("spark.shuffle.compress", "false") .set("spark.shuffle.spill.compress", "false")) +val taskMemoryManager = mock(classOf[TaskMemoryManager]) +when(taskMemoryManager.acquireExecutionMemory(any(), any())) + .thenAnswer(new Answer[Long] { +override def answer(invocation: InvocationOnMock): Long = { + invocation.getArguments()(0).asInstanceOf[Long] +} + }) +taskMemoryManager --- End diff -- So sorry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114965327 --- Diff: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala --- @@ -128,4 +130,23 @@ class MapStatusSuite extends SparkFunSuite { assert(size1 === size2) assert(!success) } + + test("Blocks which are bigger than 2 * average size should not be underestimated.") { +val sizes = Array.concat(Array.fill[Long](1000)(1L), (1000L to 2000L).toArray) +val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes) +val arrayStream = new ByteArrayOutputStream(102400) +val objectOutputStream = new ObjectOutputStream(arrayStream) +assert(status1.isInstanceOf[HighlyCompressedMapStatus]) + status1.asInstanceOf[HighlyCompressedMapStatus].writeExternal(objectOutputStream) +objectOutputStream.flush() +val array = arrayStream.toByteArray +val objectInput = new ObjectInputStream(new ByteArrayInputStream(array)) +val status2 = new HighlyCompressedMapStatus() +status2.readExternal(objectInput) --- End diff -- I'm a little bit confused and hesitant here. We `writeExternal` for serialization and do initialization in `readExternal` when deserialize, right? `objectInput.readObject().asInstanceOf[HighlyCompressedMapStatus]` is not ok, I think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114961196 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -175,33 +181,41 @@ final class ShuffleBlockFetcherIterator( val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap val remainingBlocks = new HashSet[String]() ++= sizeMap.keys val blockIds = req.blocks.map(_._1.toString) - val address = req.address -shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray, - new BlockFetchingListener { -override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = { - // Only add the buffer to results queue if the iterator is not zombie, - // i.e. cleanup() has not been called yet. - ShuffleBlockFetcherIterator.this.synchronized { -if (!isZombie) { - // Increment the ref count because we need to pass this to a different thread. - // This needs to be released after use. - buf.retain() - remainingBlocks -= blockId - results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf, -remainingBlocks.isEmpty)) - logDebug("remainingBlocks: " + remainingBlocks) -} + +val blockFetchingListener = new BlockFetchingListener { + override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = { +// Only add the buffer to results queue if the iterator is not zombie, +// i.e. cleanup() has not been called yet. +ShuffleBlockFetcherIterator.this.synchronized { + if (!isZombie) { +// Increment the ref count because we need to pass this to a different thread. +// This needs to be released after use. +buf.retain() +remainingBlocks -= blockId +results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf, + remainingBlocks.isEmpty)) +logDebug("remainingBlocks: " + remainingBlocks) } - logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } +logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) + } -override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { - logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) - results.put(new FailureFetchResult(BlockId(blockId), address, e)) -} + override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { +logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) +results.put(new FailureFetchResult(BlockId(blockId), address, e)) } -) +} +val acquired = acquireMemory(req.size) +if (acquired < req.size) { + freeMemory(acquired) + shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray, +blockFetchingListener, +Some(blockIds.map(bId => blockManager.diskBlockManager.getFile(s"remote-$bId")).toArray)) --- End diff -- Yes, I will refine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114960285 --- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala --- @@ -193,8 +206,18 @@ private[spark] object HighlyCompressedMapStatus { } else { 0 } +val hugeBlockSizes = HashMap[Int, Byte]() --- End diff -- Sure, I will refine this :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114960041 --- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala --- @@ -128,41 +130,52 @@ private[spark] class CompressedMapStatus( * @param numNonEmptyBlocks the number of non-empty blocks * @param emptyBlocks a bitmap tracking which blocks are empty * @param avgSize average size of the non-empty blocks + * @param hugeBlockSizesArray sizes of huge blocks by their reduceId. */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, -private[this] var avgSize: Long) +private[this] var avgSize: Long, +private[this] var hugeBlockSizesArray: Array[Tuple2[Int, Byte]]) extends MapStatus with Externalizable { + @transient var hugeBlockSizes: Map[Int, Byte] = +if (hugeBlockSizesArray == null) null else hugeBlockSizesArray.toMap + // loc could be null when the default constructor is called during deserialization require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1) // For deserialization only + def this() = this(null, -1, null, -1, null) // For deserialization only override def location: BlockManagerId = loc override def getSizeForBlock(reduceId: Int): Long = { if (emptyBlocks.contains(reduceId)) { 0 } else { - avgSize + hugeBlockSizes.get(reduceId) match { +case Some(size) => MapStatus.decompressSize(size) +case None => avgSize + } } } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) emptyBlocks.writeExternal(out) out.writeLong(avgSize) +out.writeObject(hugeBlockSizesArray) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) avgSize = in.readLong() +hugeBlockSizesArray = in.readObject().asInstanceOf[Array[Tuple2[Int, Byte]]] +hugeBlockSizes = hugeBlockSizesArray.toMap --- End diff -- After the constructor is called, both `hugeBlockSizes` and `hugeBlockSizesArray` are initialized to be null, I think we need to initialize both in `readExternal` when deserialize. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114959211 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java --- @@ -126,4 +151,39 @@ private void failRemainingBlocks(String[] failedBlockIds, Throwable e) { } } } + + private class DownloadCallback implements StreamCallback { + +private WritableByteChannel channel = null; +private File targetFile = null; +private int chunkIndex; + +public DownloadCallback(File targetFile, int chunkIndex) throws IOException { + this.targetFile = targetFile; + this.channel = Channels.newChannel(new FileOutputStream(targetFile)); + this.chunkIndex = chunkIndex; +} + +@Override +public void onData(String streamId, ByteBuffer buf) throws IOException { + channel.write(buf); +} + +@Override +public void onComplete(String streamId) throws IOException { + channel.close(); + ManagedBuffer buffer = new FileSegmentManagedBuffer( --- End diff -- In current implementation, the files will be removed when app is stopped. All the local dirs will be removed. I tested this on my cluster :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114943530 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java --- @@ -100,7 +114,14 @@ public void onSuccess(ByteBuffer response) { // Immediately request all chunks -- we expect that the total size of the request is // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]]. for (int i = 0; i < streamHandle.numChunks; i++) { -client.fetchChunk(streamHandle.streamId, i, chunkCallback); +if (fetchToDisk) { + final File targetFile = new File(".", --- End diff -- @cloud-fan I understood ~ I will refine, I will replace `boolean fetchToDisk` with `Option<File[]> shuffleFilesOpt`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16989 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114696480 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java --- @@ -100,7 +114,14 @@ public void onSuccess(ByteBuffer response) { // Immediately request all chunks -- we expect that the total size of the request is // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]]. for (int i = 0; i < streamHandle.numChunks; i++) { -client.fetchChunk(streamHandle.streamId, i, chunkCallback); +if (fetchToDisk) { + final File targetFile = new File(".", --- End diff -- @cloud-fan Yes, but `OneForOneBlockFetcher` is in `network-shuffle` package, I find it hard to import `SparkEnv` from `core` package. Did I miss something?(I'm sorry if this question is stupid.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17744: [SPARK-20426] Lazy initialization of FileSegmentManagedB...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17744 Thanks again for help review this pr. Currently I'm not seeing memory issue on my nodemanagers. I'd report to community if there's new finding :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17744: [SPARK-20426] Lazy initialization of FileSegmentManagedB...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17744 @tgravescs Thanks a lot for merging. I proposed to resolve this by "Lazy initialization of FileSegmentManagedBuffer" and simplify the change. But after checking the code, could we remove `OpenBlocks`. In my understanding, `OpenBlocks` is just a handshake before fetching remote blocks and could be removed conceptually(did I miss something?). This is just a brainstorming and will be a bigger change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [WIP][SPARK-19659] Fetch big blocks to disk when ...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114504511 --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala --- @@ -42,6 +46,12 @@ private[spark] class BlockStoreShuffleReader[K, C]( /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { +val memMode = --- End diff -- Yes, ideally this should be moved into `ShuffleBlockFetcherIterator`, but I didn't find a better implementation other than ``` extends MemoryConsumer(tmm, tmm.pageSizeBytes(), if (SparkTransportConf.fromSparkConf(SparkEnv.get.conf, "shuffle").preferDirectBufs()) { MemoryMode.OFF_HEAP } else { MemoryMode.ON_HEAP } ) ``` And I'd be a little bit hesitant to expose a 'setMode' in `MemoryConsumer` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [WIP][SPARK-19659] Fetch big blocks to disk when ...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114503627 --- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala --- @@ -133,36 +135,53 @@ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, -private[this] var avgSize: Long) +private[this] var avgSize: Long, +private[this] var hugeBlockSizes: HashMap[Int, Byte]) --- End diff -- Yes, I will refine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [WIP][SPARK-19659] Fetch big blocks to disk when ...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114503557 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java --- @@ -126,4 +147,38 @@ private void failRemainingBlocks(String[] failedBlockIds, Throwable e) { } } } + + private class DownloadCallback implements StreamCallback { + +private WritableByteChannel channel = null; +private File targetFile = null; +private int chunkIndex; + +public DownloadCallback(File targetFile, int chunkIndex) throws IOException { + this.targetFile = targetFile; + this.channel = Channels.newChannel(new FileOutputStream(targetFile)); + this.chunkIndex = chunkIndex; +} + +@Override +public void onData(String streamId, ByteBuffer buf) throws IOException { + channel.write(buf); +} + +@Override +public void onComplete(String streamId) throws IOException { + channel.close(); + ManagedBuffer buffer = new FileSegmentManagedBuffer( +transportConf, targetFile, 0, targetFile.length()); + listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer); +} + +@Override +public void onFailure(String streamId, Throwable cause) throws IOException { --- End diff -- Yes, that will be good ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [WIP][SPARK-19659] Fetch big blocks to disk when ...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r114503489 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java --- @@ -100,7 +114,14 @@ public void onSuccess(ByteBuffer response) { // Immediately request all chunks -- we expect that the total size of the request is // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]]. for (int i = 0; i < streamHandle.numChunks; i++) { -client.fetchChunk(streamHandle.streamId, i, chunkCallback); +if (fetchToDisk) { + final File targetFile = new File(".", --- End diff -- Yes, I wanted to use `DiskBlockManager.getFile`, but I found it's hard to import `DiskBlockManager` from `OneForOneBlockFetcher`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17634: [SPARK-20333] HashPartitioner should be compatible with ...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17634 @kayousterhout @mridulm Does this pr make sense? Could you please take a look this when you have time :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17744: [SPARK-20426] Lazy initialization of FileSegmentM...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17744#discussion_r113356306 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java --- @@ -93,14 +92,25 @@ protected void handleMessage( OpenBlocks msg = (OpenBlocks) msgObj; checkAuth(client, msg.appId); -List blocks = Lists.newArrayList(); -long totalBlockSize = 0; -for (String blockId : msg.blockIds) { - final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId, blockId); - totalBlockSize += block != null ? block.size() : 0; - blocks.add(block); -} -long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator()); +Iterator iter = new Iterator() { + private int index = 0; + + @Override + public boolean hasNext() { +return index < msg.blockIds.length; + } + + @Override + public ManagedBuffer next() { +final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId, + msg.blockIds[index]); --- End diff -- @tgravescs Thanks a lot for taking time looking into this :) In my understanding, the `OpenBlocks` will be kept in heap after initialization(https://github.com/apache/spark/blob/master/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java#L84). Yes, `TransportRequestHandler.processRpcRequest` will release the `ByteBuf`, but the `OpenBlocks` will not be released. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17744: [SPARK-20426] Lazy initialization of FileSegmentManagedB...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17744 Spark jobs are running on yarn cluster in my warehouse. We enabled the external shuffle service(--conf spark.shuffle.service.enabled=true). Recently NodeManager runs OOM now and then. Dumping heap memory, we find that OneFroOneStreamManager's footprint is huge. NodeManager is configured with 5G heap memory. While OneForOneManager costs 2.5G and there are 5503233 FileSegmentManagedBuffer objects. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17744: [SPARK-20426] Lazy initialization of FileSegmentM...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/17744 [SPARK-20426] Lazy initialization of FileSegmentManagedBuffer for shuffle service. ## What changes were proposed in this pull request? When application contains large amount of shuffle blocks. NodeManager requires lots of memory to keep metadata(`FileSegmentManagedBuffer`) in `StreamManager`. When the number of shuffle blocks is big enough. NodeManager can run OOM. This pr proposes to do lazy initialization of `FileSegmentManagedBuffer` in shuffle service. ## How was this patch tested? Manually test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-20426 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17744.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17744 commit 5fb91bfc5cdd588ae728a39173521279f517f20e Author: jinxing <jinxing6...@126.com> Date: 2017-04-24T12:52:00Z [SPARK-20426] Lazy initialization of FileSegmentManagedBuffer for shuffle service. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [WIP][SPARK-19659] Fetch big blocks to disk when ...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16989#discussion_r111734780 --- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala --- @@ -133,36 +135,53 @@ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, -private[this] var avgSize: Long) +private[this] var avgSize: Long, +private[this] var hugeBlockSizes: HashMap[Int, Byte]) extends MapStatus with Externalizable { // loc could be null when the default constructor is called during deserialization require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1) // For deserialization only + def this() = this(null, -1, null, -1, null) // For deserialization only --- End diff -- Remove the `protected` and make this visible for test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16989: [WIP][SPARK-19659] Fetch big blocks to disk when shuffle...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16989 Jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17533: [WIP][SPARK-20219] Schedule tasks based on size of input...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17533 I think the failed unit test can be fixed in https://github.com/apache/spark/pull/17634 and https://github.com/apache/spark/pull/17603 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17603: [SPARK-20288] Avoid generating the MapStatus by stageId ...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17603 @squito Could you help comment on this ? :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17634: [SPARK-20333] HashPartitioner should be compatible with ...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17634 @squito @srowen Could you help comment on this :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17533: [WIP][SPARK-20219] Schedule tasks based on size of input...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17533 @squito Thank you so much for reviewing thus far and sorry for the complexity I bring in. I tried to simplify the code according to your comment and please take another look when tests passed. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17533#discussion_r111546462 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -168,6 +169,8 @@ private[spark] class TaskSetManager( t.epoch = epoch } + private val sortedPendingTasks = new AtomicBoolean(false) --- End diff -- Yes, in current change, I do the ordering when create `TaskSet`, there is no change in `TSM` now. Thanks a lot for suggestion :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17533#discussion_r111545406 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1080,6 +1122,25 @@ class DAGScheduler( } } + // Visible for testing. + private[spark] def getTaskInputSizesFromShuffledRDD(tasks: Seq[Task[_]]): Map[Task[_], Long] = { +val taskInputSizeFromShuffledRDD = HashMap[Task[_], Long]() +tasks.foreach { + case task => +val size = + parentSplitsInShuffledRDD(task.stageId, task.partitionId).map { +case parentSplits => + parentSplits.map { +case (shuffleId, splits) => + splits.map(mapOutputTracker.getMapSizesByExecutorId(shuffleId, _) +.flatMap(_._2.map(_._2)).sum).sum --- End diff -- >It also occurs to me that in general this could use the total input size for the task, but I guess spark isn't looking at that in general yet (though it probably could, from hadoop's InputSplit.getLength()). Just something to keep in mind. Agree :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17533#discussion_r111545327 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1080,6 +1122,25 @@ class DAGScheduler( } } + // Visible for testing. + private[spark] def getTaskInputSizesFromShuffledRDD(tasks: Seq[Task[_]]): Map[Task[_], Long] = { --- End diff -- Yes, should be refined. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17533#discussion_r111545285 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -472,6 +472,47 @@ class DAGScheduler( } /** + * Get ancestor splits in ShuffledRDD. + */ + private[spark] def parentSplitsInShuffledRDD(stageId: Int, pId: Int): Option[Map[Int, Set[Int]]] = + { +stageIdToStage.get(stageId) match { + case Some(stage) => +val waitingForVisit = new Stack[Tuple2[RDD[_], Int]] +waitingForVisit.push((stage.rdd, pId)) +val ret = new HashMap[Int, HashSet[Int]]() +while(waitingForVisit.nonEmpty) { + val (rdd, split) = waitingForVisit.pop() + if (getCacheLocs(rdd)(split) == Nil) { +rdd.dependencies.foreach { + case dep: ShuffleDependency[_, _, _] => +val noPartitionerConflict = rdd.partitioner match { + case Some(partitioner) => +partitioner.isInstanceOf[HashPartitioner] && +dep.partitioner.isInstanceOf[HashPartitioner] && +partitioner.numPartitions == dep.partitioner.numPartitions --- End diff -- Yes, I always think rdd.partitioner should be the same with shuffle dependencies partitioner. But I found `CustomShuffledRDD` is a different one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17533#discussion_r111545019 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -472,6 +472,47 @@ class DAGScheduler( } /** + * Get ancestor splits in ShuffledRDD. + */ + private[spark] def parentSplitsInShuffledRDD(stageId: Int, pId: Int): Option[Map[Int, Set[Int]]] = --- End diff -- Yes, this is confusing and I need to refine this. I'm a little bit hesitant to use `getShuffleDependencies`. I need to get the total size of input from `ShuffledRDD` for every child's partition. After transformations like `CoalescedRDD`, there may be not a consistent one-to-one match between ancestor's partition index and child's partition index. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17634: [SPARK-20333] HashPartitioner should be compatible with ...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17634 I found this when doing https://github.com/apache/spark/pull/17533 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17634: [SPARK-20333] HashPartitioner should be compatibl...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/17634 [SPARK-20333] HashPartitioner should be compatible with num of child RDD's partitions. ## What changes were proposed in this pull request? Fix test "don't submit stage until its dependencies map outputs are registered (SPARK-5259)" in DAGSchedulerSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-20333 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17634.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17634 commit e9d82977b1bee6aa0573bd745985e97a81b8e514 Author: jinxing <jinxing6...@126.com> Date: 2017-04-14T04:00:31Z HashPartitioner should be compatible with num of child RDD's partitions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17603: [SPARK-20288] Avoid generating the MapStatus by stageId ...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17603 I found this when test https://github.com/apache/spark/pull/17533. It failed now and then when try to get size of reduce from `MapStatus`. I'm not sure how to make it better: Modify the test as this pr or Change https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L380 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17603: [SPARK-20288] Avoid generating the MapStatus by s...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/17603 [SPARK-20288] Avoid generating the MapStatus by stageId in BasicSchedulerIntegrationSuite ## What changes were proposed in this pull request? ShuffleId is determined before job submitted. But it's hard to predict stageId by shuffleId. Stage is created in DAGScheduler( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L381), but the order is n ot determined. I added a log(println(s"Creating ShufflMapStage-$id on shuffle-${shuffleDep.shuffleId}")) after (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L331), when testing BasicSchedulerIntegrationSuite:"multi-stage job". It will print: Creating ShufflMapStage-0 on shuffle-0 Creating ShufflMapStage-1 on shuffle-2 Creating ShufflMapStage-2 on shuffle-1 Creating ShufflMapStage-3 on shuffle-3 or Creating ShufflMapStage-0 on shuffle-1 Creating ShufflMapStage-1 on shuffle-3 Creating ShufflMapStage-2 on shuffle-0 Creating ShufflMapStage-3 on shuffle-2 It might be better to avoid generating the MapStatus by stageId. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-20288 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17603.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17603 commit ea883f5d2b4bd63fae1f46bfe6ba78b352a894dc Author: jinxing <jinxing6...@126.com> Date: 2017-04-11T06:16:31Z [SPARK-20288] Avoid generating the MapStatus by stageId in BasicSchedulerIntegrationSuite --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17533: [SPARK-20219] Schedule tasks based on size of input from...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17533 @squito Thank you so much for taking look into this. > we don't want the TSM requesting info from the DAGSCheduler Sorry I missed this point for the previous change. Now I push the info(size of input from ShuffledRDD) when create TSM. Also I added a test in `DAGSchedulerSuite` to check the sizes are getting computed correctly. Thanks a lot again :) and hope I understand your comment correctly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17533: [SPARK-20219] Schedule tasks based on size of input from...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17533 @kayousterhout Thanks a lot for comment and sorry for late reply. I replied your comment from JIRA. Please take a look when you have time :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17533: [SPARK-20219] Schedule tasks based on size of inp...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17533#discussion_r109930532 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -512,6 +522,57 @@ private[spark] class TaskSetManager( } } + private[this] def sortPendingTasks(): Unit = { +val taskIndexs = (0 until numTasks).toArray +implicit def ord = new Ordering[Int] { + override def compare(x: Int, y: Int): Int = +getTaskInputSizeFromShuffledRDD(tasks(x)) compare + getTaskInputSizeFromShuffledRDD(tasks(y)) +} +if (tasks.nonEmpty) { + // Sort the tasks based on their input size from ShuffledRDD. + pendingTasksForExecutor.foreach { +case (k, v) => pendingTasksForExecutor(k) = v.sorted + } + pendingTasksForHost.foreach { +case (k, v) => pendingTasksForHost(k) = v.sorted + } + pendingTasksForRack.foreach { +case (k, v) => pendingTasksForRack(k) = v.sorted + } + pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.sorted + allPendingTasks = allPendingTasks.sorted +} + } + + // Visible for testing + private[spark] def setTaskInputSizeFromShuffledRDD(inputSize: Map[Task[_], Long]) = { +taskInputSizeFromShuffledRDD.clear() +inputSize.foreach{ + case (k, v) => taskInputSizeFromShuffledRDD(k) = v +} + } + + private[this] def getTaskInputSizeFromShuffledRDD(task: Task[_]): Long = { +taskInputSizeFromShuffledRDD.get(task) match { + case Some(size) => size + case None => +val size = + sched.dagScheduler.parentSplitsInShuffledRDD(task.stageId, task.partitionId) match { --- End diff -- I got it :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17533#discussion_r109901087 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -512,6 +522,57 @@ private[spark] class TaskSetManager( } } + private[this] def sortPendingTasks(): Unit = { +val taskIndexs = (0 until numTasks).toArray +implicit def ord = new Ordering[Int] { + override def compare(x: Int, y: Int): Int = +getTaskInputSizeFromShuffledRDD(tasks(x)) compare + getTaskInputSizeFromShuffledRDD(tasks(y)) +} +if (tasks.nonEmpty) { + // Sort the tasks based on their input size from ShuffledRDD. + pendingTasksForExecutor.foreach { +case (k, v) => pendingTasksForExecutor(k) = v.sorted + } + pendingTasksForHost.foreach { +case (k, v) => pendingTasksForHost(k) = v.sorted + } + pendingTasksForRack.foreach { +case (k, v) => pendingTasksForRack(k) = v.sorted + } + pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.sorted + allPendingTasks = allPendingTasks.sorted +} + } + + // Visible for testing + private[spark] def setTaskInputSizeFromShuffledRDD(inputSize: Map[Task[_], Long]) = { +taskInputSizeFromShuffledRDD.clear() +inputSize.foreach{ + case (k, v) => taskInputSizeFromShuffledRDD(k) = v +} + } + + private[this] def getTaskInputSizeFromShuffledRDD(task: Task[_]): Long = { +taskInputSizeFromShuffledRDD.get(task) match { + case Some(size) => size + case None => +val size = + sched.dagScheduler.parentSplitsInShuffledRDD(task.stageId, task.partitionId) match { --- End diff -- Yes, this should be made clearer. But sorry, I didn't find a function like `getOrElse(func, 0L)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17533#discussion_r109900096 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -438,6 +443,11 @@ private[spark] class TaskSetManager( blacklist.isExecutorBlacklistedForTaskSet(execId) } if (!isZombie && !offerBlacklisted) { + if (!sortedPendingTasks.get()) { --- End diff -- Yes, I should refine :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17533#discussion_r109900019 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -512,6 +522,57 @@ private[spark] class TaskSetManager( } } + private[this] def sortPendingTasks(): Unit = { +val taskIndexs = (0 until numTasks).toArray +implicit def ord = new Ordering[Int] { --- End diff -- Yes, I think so:) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17533#discussion_r109896244 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -512,6 +522,57 @@ private[spark] class TaskSetManager( } } + private[this] def sortPendingTasks(): Unit = { +val taskIndexs = (0 until numTasks).toArray +implicit def ord = new Ordering[Int] { + override def compare(x: Int, y: Int): Int = +getTaskInputSizeFromShuffledRDD(tasks(x)) compare + getTaskInputSizeFromShuffledRDD(tasks(y)) +} +if (tasks.nonEmpty) { + // Sort the tasks based on their input size from ShuffledRDD. + pendingTasksForExecutor.foreach { +case (k, v) => pendingTasksForExecutor(k) = v.sorted + } + pendingTasksForHost.foreach { +case (k, v) => pendingTasksForHost(k) = v.sorted + } + pendingTasksForRack.foreach { +case (k, v) => pendingTasksForRack(k) = v.sorted + } + pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.sorted + allPendingTasks = allPendingTasks.sorted +} + } + + // Visible for testing + private[spark] def setTaskInputSizeFromShuffledRDD(inputSize: Map[Task[_], Long]) = { +taskInputSizeFromShuffledRDD.clear() +inputSize.foreach{ --- End diff -- Yes, it should be refined :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17533#discussion_r109893630 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -168,6 +169,10 @@ private[spark] class TaskSetManager( t.epoch = epoch } + val sortedPendingTasks = new AtomicBoolean(false) --- End diff -- Yes, it should be. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17533#discussion_r109877754 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -138,7 +139,7 @@ private[spark] class TaskSetManager( private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int] // Set containing all pending tasks (also used as a stack, as above). - private val allPendingTasks = new ArrayBuffer[Int] + private var allPendingTasks = new ArrayBuffer[Int] --- End diff -- I make this `var`, because I don't have a better approach to sort ArrayBuffer in place. Advice? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17533: [WIP][SPARK-20219] Schedule tasks based on size of input...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17533 Yes, I did the test in my cluster. In highly-skew stage, the time cost can be reduced significantly. Tasks are scheduled with locality preference. But in current code, input size of tasks are not taken into consideration. Think about this scenario: 1. There are 9 partitions(0~8) in the ShuffledRDD and size of partition-8 is 8 times of the previous 8 partitions. (Lets assume that time cost of task has linear relation with the size of input and time cost of first 8 tasks is 1 and the time cost of the last task is 8.) 2. Tasks are scheduled on 2 executors. In current code, the tasks are scheduled in serial order and task for partition-8 will be the last one to launch and the time cost is 12. With this change, task for partition-8 will be scheduled first and the time cost will be reduced to 8. This change is related to SPARK-19100. In my prod env, skew situations happens mostly on ShuffledRDD. Thus this pr proposes to consider the size of input from ShuffledRDD when scheduling. This change can bring benefit when skew situations and won't have negative impact on performance in other scenarios. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17533: [SPARK-20219] Schedule tasks based on size of inp...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/17533 [SPARK-20219] Schedule tasks based on size of input from ScheduledRDD ## What changes were proposed in this pull request? When data is highly skewed on `ShuffledRDD`, it make sense to launch those tasks which process much more input as soon as possible. The current scheduling mechanism in `TaskSetManager` is quite simple: ``` for (i <- (0 until numTasks).reverse) { addPendingTask(i) } ``` In scenario that "large tasks" locate at bottom half of tasks array, if tasks with much more input are launched early, we can significantly reduce the time cost and save resource when "dynamic allocation" is disabled. ## How was this patch tested? Added unit test in 'TaskSetManagerSuite'. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-20219 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17533.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17533 commit bca146c531060f32fa351397e13f0f778fe35b25 Author: jinxing <jinxing6...@126.com> Date: 2017-04-04T15:25:19Z Sort tasks based on their size. commit f757e4125935f7237a29bb07313ffb46ccbb3cd0 Author: jinxing <jinxing6...@126.com> Date: 2017-04-05T03:06:41Z Add unit test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17112: [WIP] Measurement for SPARK-16929.
Github user jinxing64 closed the pull request at: https://github.com/apache/spark/pull/17112 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17276: [SPARK-19937] Collect metrics of block sizes when shuffl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17276 @mridulm Sorry for late reply. I opened the pr for SPARK-19659(https://github.com/apache/spark/pull/16989) and make these two PRs independent. Basically this pr is is to evaluate the performance(blocks are shuffled to disk) and stability(size in `MapStatus` is inaccurate and OOM can happen) of the implementation proposed in SPARK-19659. I'd be so thankful if you have time to comment on these two PRs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17276: [SPARK-19937] Collect metrics of block sizes when...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17276#discussion_r108061417 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java --- @@ -169,6 +173,36 @@ public void write(Iterator<Product2<K, V>> records) throws IOException { } } mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); +if (mapStatus instanceof HighlyCompressedMapStatus) { + HighlyCompressedMapStatus hc = (HighlyCompressedMapStatus) mapStatus; + long underestimatedBlocksSize = 0L; + for (int i = 0; i < partitionLengths.length; i++) { +if (partitionLengths[i] > mapStatus.getSizeForBlock(i)) { + underestimatedBlocksSize += partitionLengths[i]; +} + } + writeMetrics.incUnderestimatedBlocksSize(underestimatedBlocksSize); + if (logger.isDebugEnabled() && partitionLengths.length > 0) { +int underestimatedBlocksNum = 0; +// Distribution of sizes in MapStatus. +double[] cp = new double[partitionLengths.length]; +for (int i = 0; i < partitionLengths.length; i++) { + cp[i] = partitionLengths[i]; + if (partitionLengths[i] > mapStatus.getSizeForBlock(i)) { +underestimatedBlocksNum++; + } +} +Distribution distribution = new Distribution(cp, 0, cp.length); +double[] probabilities = {0.0, 0.25, 0.5, 0.75, 1.0}; +String distributionStr = distribution.getQuantiles(probabilities).mkString(", "); +logger.debug("For task {}.{} in stage {} (TID {}), the block sizes in MapStatus are " + + "inaccurate (average is {}, {} blocks underestimated, size of underestimated is {})," + + " distribution at the given probabilities(0, 0.25, 0.5, 0.75, 1.0) is {}.", + taskContext.partitionId(), taskContext.attemptNumber(), taskContext.stageId(), + taskContext.taskAttemptId(), hc.getAvgSize(), + underestimatedBlocksNum, underestimatedBlocksSize, distributionStr); + } +} --- End diff -- In `CompressedMapStatus`, the blocks sizes are accurate, so I might hesitate to add that log. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17276: [SPARK-19937] Collect metrics of block sizes when shuffl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17276 @mridulm Thanks a lot for taking time looking into this and thanks for comments :) 1) I changed the size of underestimated blocks to be `partitionLengths.filter(_ > hc.getAvgSize).map(_ - hc.getAvgSize).sum` 2) I added a method `genBlocksDistributionStr` and call it from `ShuffleWriters`, thus avoid duplicate codes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17276: [WIP][SPARK-19937] Collect metrics of block sizes when s...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17276 @squito Thanks a lot for taking time looking into this pr. I updated the pr. Currently just add two metrics: a) the total size of underestimated blocks size, b) the size of blocks shuffled to memory. For a), executor use `maxBytesInFlight` to control the speed of shuffle-read. I agree with your comment `another metric that may be nice to capture here is maximum underestimate`. But think about this scenario: the maximum is small, but thousands of blocks are underestimated, thus `maxBytesInFlight` cannot help avoid the OOM during shuffle-read. That's why I proposed to track the metrics of total size of underestimated blocks size; For b), currently all data are shuffled-read to memory. If we add the feature of shuffling to disk when memory shortage, we need to evaluate the performance. I think another two metrics need to be taken into account: the size of blocks shuffled to disk(to be added in another pr) and task's running time(already exist). The more data shuffled to memory, the better performance; The shorter time cost, the better performance. I also added some log for debug in `ShuffleWriter`, including the num of underestimated blocks and the size distribution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @kayousterhout @squito @mridulm Thanks for reviewing this ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @kayousterhout Thanks a lot for comments. I refined accordingly :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 @rxin Yes, I'm so confused by the second screenshot I posted. The only reason I can find is that the `stageData` in `ExecutorTable` is none thread safe. Size(2 executors) returned; maybe at this point, new executor is inserted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17276: [WIP][SPARK-19937] Collect metrics of block sizes when s...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17276 You are so kind person. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 @rxin because I killed executor1 and it is not active during this stage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17276: [SPARK-19937] Collect metrics of block sizes when shuffl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17276 @squito oh, I feel sorry if this is disturbing. I will mark it as wip. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 @harounemohammedi Thanks a lot for comment on this. I'm hesitate to include the `total time` in this pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17276: [SPARK-19937] Collect metrics of block sizes when shuffl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17276 @squito Thanks a lot for your comments and I will think and do the test carefully :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @kayousterhout more comments? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16389: [SPARK-18981][Core]The job hang problem when speculation...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16389 @zhaorongsheng I think its better to just not reset `numRunningTasks` to 0. If we got some `ExecutorLostFailure`, the stage should not be marked as finished. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 I want to show the number of executors once active during the stage. `StageUIData` gets updated when receiving the hear beat from executor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 @rxin @jerryshao @srowen I've refined the description and uploaded the screenshot of latest version. Please take another look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 https://cloud.githubusercontent.com/assets/4058918/24134191/8392c5ea-0e3d-11e7-8a53-f164acf04764.png;> --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 Sure, that would be cool :) Thanks again you can help review this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 @rxin @jerryshao I uploaded another screenshot and give a short description there. Now it is (2 executors supplied). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 ![screenshot2](https://cloud.githubusercontent.com/assets/4058918/24127926/5e0e7294-0e13-11e7-8af0-434b05e2815a.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 @jerryshao Thanks a lot you can help review, really appreciate. I will give a description soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @squito Thanks :) already refined. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @mridulm Thanks a lot for helping review this : ) really appreciate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 @rxin Thanks a lot. I added a number after `Aggregated Metrics by Executor` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 ![screenshot](https://cloud.githubusercontent.com/assets/4058918/24069386/0f556622-0be2-11e7-9f48-cc096cdd7d9b.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @squito Sure. I did test for 100k tasks. The results are as below: | | time cost | | --| -- | | insert | 135ms, 122ms, 119ms, 120ms, 163ms | | `checkSpeculatableTasks` | 6ms, 6ms, 6ms, 5ms, 6ms | --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17276: [SPARK-19937] Collect metrics of block sizes when shuffl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17276 @squito Would you mind help comment on this when have time ? :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r106453502 --- Diff: core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.util.Arrays +import java.util.NoSuchElementException + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import org.apache.spark.SparkFunSuite + +class MedianHeapSuite extends SparkFunSuite { + + test("If no numbers in MedianHeap, NoSuchElementException is thrown.") { +val medianHeap = new MedianHeap() +var valid = false +try { + medianHeap.median +} catch { + case e: NoSuchElementException => +valid = true +} + +assert(valid) + } + + test("Median should be correct when size of MedianHeap is even") { +val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) +val medianHeap = new MedianHeap() +array.foreach(medianHeap.insert(_)) +assert(medianHeap.size() === 10) +assert(medianHeap.median === ((array(4) + array(5)) / 2.0)) + } + + test("Median should be correct when size of MedianHeap is odd") { +val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8) +val medianHeap = new MedianHeap() +array.foreach(medianHeap.insert(_)) +assert(medianHeap.size() === 9) +assert(medianHeap.median === (array(4))) + } + + test("Size of Median should be correct though there are duplicated numbers inside.") { +val array = Array(0, 0, 1, 1, 2, 2, 3, 3, 4, 4) +val medianHeap = new MedianHeap() +array.foreach(medianHeap.insert(_)) +Arrays.sort(array) +assert(medianHeap.size === 10) +assert(medianHeap.median === ((array(4) + array(5)) / 2.0)) + } --- End diff -- Yes, I added this change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r106453205 --- Diff: core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.util.Arrays +import java.util.NoSuchElementException + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import org.apache.spark.SparkFunSuite + +class MedianHeapSuite extends SparkFunSuite { + + test("If no numbers in MedianHeap, NoSuchElementException is thrown.") { +val medianHeap = new MedianHeap() +var valid = false +try { + medianHeap.median +} catch { + case e: NoSuchElementException => +valid = true +} --- End diff -- Thanks a lot for the recommendation :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r106433273 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -172,7 +172,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") - speculationScheduler.scheduleAtFixedRate(new Runnable { + speculationScheduler.scheduleWithFixedDelay(new Runnable { --- End diff -- @squito Thanks a lot for looking into this. I will put the change in another pr : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 The executor metrics are updated to `StageUIData` when receive executor hear beat. Yes, the longevity of the executor may not cover the whole stage, but it was once active during the stage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17312: [SPARK-19973] Display num of executors for the st...
GitHub user jinxing64 reopened a pull request: https://github.com/apache/spark/pull/17312 [SPARK-19973] Display num of executors for the stage. ## What changes were proposed in this pull request? In `StagePage` the total num of executors are not displayed. Since executorId may not be consecutive. It is useful to display the total number, which is useful when check how many executors are supplied during the stage. ## How was this patch tested? Manually test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19973 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17312.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17312 commit 7aa6fe53c58601285499fd94d7587df7a5ac7477 Author: jinxing <jinxing6...@126.com> Date: 2017-03-16T10:09:48Z [SPARK-19973] Display num of executors for the stage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17312: [SPARK-19973] Display num of executors for the st...
Github user jinxing64 closed the pull request at: https://github.com/apache/spark/pull/17312 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17312: Display num of executors for the stage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17312 @srowen Thanks a lot for quick reply. When we check the reason why a stage ran today much longer than yesterday, we want to know how many executors are supplied. We don't want to count the executor one by one. A total number of executors make sense here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17312: Display num of executors for the stage.
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/17312 Display num of executors for the stage. ## What changes were proposed in this pull request? In `StagePage` the total num of executors are not displayed. Since executorId may not be consecutive. It is useful to display the total number, which is useful when check how many executors are supplied during the stage. ## How was this patch tested? Manually test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19973 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17312.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17312 commit f64691c0007408d95803019530c2352cecbf7c9e Author: jinxing <jinxing6...@126.com> Date: 2017-03-16T10:09:48Z Display num of executors for the stage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @kayousterhout Thanks a lot for the comments :) very helpful. I've refined, please take another look when you have time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r106340513 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -893,6 +893,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskSet = FakeTask.createTaskSet(4) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately sc.conf.set("spark.speculation.multiplier", "0.0") +sc.conf.set("spark.speculation", "true") --- End diff -- This should be set. Because the duration is inserted to `MedianHeap` only when `spark.speculation`(e.g. If I remove this, `MedianHeap` will be empty when call `checkSpeculatableTasks`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r106340321 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -172,7 +172,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") - speculationScheduler.scheduleAtFixedRate(new Runnable { + speculationScheduler.scheduleWithFixedDelay(new Runnable { --- End diff -- I was thinking `checkSpeculatableTasks` will synchronize `TaskSchedulerImpl`. If `checkSpeculatableTasks` doesn't finish with 100ms, then the possibility exists for that thread to release and then immediately re-acquire the lock. Should this be included in this pr? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @kayousterhout @mridulm More comments on this ? :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @squito Thanks a lot for comments. I've refined :):) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17276: [WIP][SPARK-19937] Collect metrics of block sizes...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/17276 [WIP][SPARK-19937] Collect metrics of block sizes when shuffle. ## What changes were proposed in this pull request? Metrics of blocks sizes(when shuffle) should be collected for later analysis. This is helpful for analysis when skew situations or OOM happens(though maxBytesInFlight is set). This pr proposes to: 1. Store the distribution of sizes in `MapStatus` and count the block sizes in ranges [0, 1k), [1k, 10k), [10k, 100k), [100k, 1m), [1m, 10m), [10m, 100m), [100m, 1g), [1g, 10g), [10g, Long.MaxValue). 2. Record the inaccuracy of block sizes. Because `HighlyCompressedMapStatus` is returned and only average size is recorded when block sizes is over 2000. ## How was this patch tested? You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19937 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17276.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17276 commit 430ec95291393f49096fc07df98c15e700f23e8c Author: jinxing <jinxing6...@126.com> Date: 2017-03-13T13:35:51Z [SPARK-19937] Collect metrics of block sizes when shuffle. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @squito Sorry, it seems like something went wrong when I did merge and try resolve the conflict. I squashed the commits and did rebase. It seems ok now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17133: [SPARK-19793] Use clock.getTimeMillis when mark task as ...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17133 @vanzin @srowen I refined according to the comments, please take a look when you have time :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @kayousterhout @squito @mridulm I refined according comments. Please take a look when you have time :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17208: [SPARK-19868] conflict TasksetManager lead to spark stop...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17208 @squito Thanks for notification :) this is not in my pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org