[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-08 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r115220025
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -163,6 +173,8 @@ final class ShuffleBlockFetcherIterator(
 case _ =>
   }
 }
+freeMemory(getUsed)
--- End diff --

Yes, I should refine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-08 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r115219258
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---
@@ -128,41 +130,52 @@ private[spark] class CompressedMapStatus(
  * @param numNonEmptyBlocks the number of non-empty blocks
  * @param emptyBlocks a bitmap tracking which blocks are empty
  * @param avgSize average size of the non-empty blocks
+ * @param hugeBlockSizesArray sizes of huge blocks by their reduceId.
  */
 private[spark] class HighlyCompressedMapStatus private (
 private[this] var loc: BlockManagerId,
 private[this] var numNonEmptyBlocks: Int,
 private[this] var emptyBlocks: RoaringBitmap,
-private[this] var avgSize: Long)
+private[this] var avgSize: Long,
+private[this] var hugeBlockSizesArray: Array[Tuple2[Int, Byte]])
   extends MapStatus with Externalizable {
 
+  @transient var hugeBlockSizes: Map[Int, Byte] =
+if (hugeBlockSizesArray == null) null else hugeBlockSizesArray.toMap
+
   // loc could be null when the default constructor is called during 
deserialization
   require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
 "Average size can only be zero for map stages that produced no output")
 
-  protected def this() = this(null, -1, null, -1)  // For deserialization 
only
+  def this() = this(null, -1, null, -1, null)  // For deserialization only
 
   override def location: BlockManagerId = loc
 
   override def getSizeForBlock(reduceId: Int): Long = {
 if (emptyBlocks.contains(reduceId)) {
   0
 } else {
-  avgSize
+  hugeBlockSizes.get(reduceId) match {
+case Some(size) => MapStatus.decompressSize(size)
+case None => avgSize
+  }
 }
   }
 
   override def writeExternal(out: ObjectOutput): Unit = 
Utils.tryOrIOException {
 loc.writeExternal(out)
 emptyBlocks.writeExternal(out)
 out.writeLong(avgSize)
+out.writeObject(hugeBlockSizesArray)
   }
 
   override def readExternal(in: ObjectInput): Unit = 
Utils.tryOrIOException {
 loc = BlockManagerId(in)
 emptyBlocks = new RoaringBitmap()
 emptyBlocks.readExternal(in)
 avgSize = in.readLong()
+hugeBlockSizesArray = in.readObject().asInstanceOf[Array[Tuple2[Int, 
Byte]]]
+hugeBlockSizes = hugeBlockSizesArray.toMap
--- End diff --

Yes, I think so. For example in "MapStatusSuite: HighlyCompressedMapStatus: 
estimated size should be the average non-empty block size". If I remove this 
line, after `compressAndDecompressMapStatus`, `hugeBlockSizes` is initialized 
to be null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...

2017-05-08 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16989
  
@cloud-fan 
More comments on this ? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r115021013
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---
@@ -128,41 +130,52 @@ private[spark] class CompressedMapStatus(
  * @param numNonEmptyBlocks the number of non-empty blocks
  * @param emptyBlocks a bitmap tracking which blocks are empty
  * @param avgSize average size of the non-empty blocks
+ * @param hugeBlockSizesArray sizes of huge blocks by their reduceId.
  */
 private[spark] class HighlyCompressedMapStatus private (
 private[this] var loc: BlockManagerId,
 private[this] var numNonEmptyBlocks: Int,
 private[this] var emptyBlocks: RoaringBitmap,
-private[this] var avgSize: Long)
+private[this] var avgSize: Long,
+private[this] var hugeBlockSizesArray: Array[Tuple2[Int, Byte]])
   extends MapStatus with Externalizable {
 
+  @transient var hugeBlockSizes: Map[Int, Byte] =
+if (hugeBlockSizesArray == null) null else hugeBlockSizesArray.toMap
+
   // loc could be null when the default constructor is called during 
deserialization
   require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
 "Average size can only be zero for map stages that produced no output")
 
-  protected def this() = this(null, -1, null, -1)  // For deserialization 
only
+  def this() = this(null, -1, null, -1, null)  // For deserialization only
 
   override def location: BlockManagerId = loc
 
   override def getSizeForBlock(reduceId: Int): Long = {
 if (emptyBlocks.contains(reduceId)) {
   0
 } else {
-  avgSize
+  hugeBlockSizes.get(reduceId) match {
+case Some(size) => MapStatus.decompressSize(size)
+case None => avgSize
+  }
 }
   }
 
   override def writeExternal(out: ObjectOutput): Unit = 
Utils.tryOrIOException {
 loc.writeExternal(out)
 emptyBlocks.writeExternal(out)
 out.writeLong(avgSize)
+out.writeObject(hugeBlockSizesArray)
   }
 
   override def readExternal(in: ObjectInput): Unit = 
Utils.tryOrIOException {
 loc = BlockManagerId(in)
 emptyBlocks = new RoaringBitmap()
 emptyBlocks.readExternal(in)
 avgSize = in.readLong()
+hugeBlockSizesArray = in.readObject().asInstanceOf[Array[Tuple2[Int, 
Byte]]]
+hugeBlockSizes = hugeBlockSizesArray.toMap
--- End diff --

@cloud-fan 
>After the constructor is called, both hugeBlockSizes and 
hugeBlockSizesArray are initialized to be null, I think we need to initialize 
both in readExternal when deserialize.

I still think I should keep this as it is here. Am I wrong?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r115018506
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala ---
@@ -128,4 +130,23 @@ class MapStatusSuite extends SparkFunSuite {
 assert(size1 === size2)
 assert(!success)
   }
+
+  test("Blocks which are bigger than 2 * average size should not be 
underestimated.") {
+val sizes = Array.concat(Array.fill[Long](1000)(1L), (1000L to 
2000L).toArray)
+val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes)
+val arrayStream = new ByteArrayOutputStream(102400)
+val objectOutputStream = new ObjectOutputStream(arrayStream)
+assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+
status1.asInstanceOf[HighlyCompressedMapStatus].writeExternal(objectOutputStream)
--- End diff --

Yes, I should refine this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16989
  
@cloud-fan 
Really really thankful for reviewing this pr:). I've refined according to 
your comments. Please take another look at this when you have time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114967741
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---
@@ -128,41 +130,52 @@ private[spark] class CompressedMapStatus(
  * @param numNonEmptyBlocks the number of non-empty blocks
  * @param emptyBlocks a bitmap tracking which blocks are empty
  * @param avgSize average size of the non-empty blocks
+ * @param hugeBlockSizesArray sizes of huge blocks by their reduceId.
  */
 private[spark] class HighlyCompressedMapStatus private (
 private[this] var loc: BlockManagerId,
 private[this] var numNonEmptyBlocks: Int,
 private[this] var emptyBlocks: RoaringBitmap,
-private[this] var avgSize: Long)
+private[this] var avgSize: Long,
+private[this] var hugeBlockSizesArray: Array[Tuple2[Int, Byte]])
   extends MapStatus with Externalizable {
 
+  @transient var hugeBlockSizes: Map[Int, Byte] =
+if (hugeBlockSizesArray == null) null else hugeBlockSizesArray.toMap
+
   // loc could be null when the default constructor is called during 
deserialization
   require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
 "Average size can only be zero for map stages that produced no output")
 
-  protected def this() = this(null, -1, null, -1)  // For deserialization 
only
+  def this() = this(null, -1, null, -1, null)  // For deserialization only
 
   override def location: BlockManagerId = loc
 
   override def getSizeForBlock(reduceId: Int): Long = {
 if (emptyBlocks.contains(reduceId)) {
   0
 } else {
-  avgSize
+  hugeBlockSizes.get(reduceId) match {
+case Some(size) => MapStatus.decompressSize(size)
+case None => avgSize
+  }
 }
   }
 
   override def writeExternal(out: ObjectOutput): Unit = 
Utils.tryOrIOException {
 loc.writeExternal(out)
 emptyBlocks.writeExternal(out)
 out.writeLong(avgSize)
+out.writeObject(hugeBlockSizesArray)
   }
 
   override def readExternal(in: ObjectInput): Unit = 
Utils.tryOrIOException {
 loc = BlockManagerId(in)
 emptyBlocks = new RoaringBitmap()
 emptyBlocks.readExternal(in)
 avgSize = in.readLong()
+hugeBlockSizesArray = in.readObject().asInstanceOf[Array[Tuple2[Int, 
Byte]]]
+hugeBlockSizes = hugeBlockSizesArray.toMap
--- End diff --

I cannot pass "MapStatusSuite: HighlyCompressedMapStatus: estimated size 
should be the average non-empty block size" if remove this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114965988
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ---
@@ -401,4 +424,74 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 assert(id3 === ShuffleBlockId(0, 2, 0))
   }
 
+  test("Blocks should be shuffled to disk when size is above the 
threshold, otherwise to memory.") {
+val blockManager = mock(classOf[BlockManager])
+val localBmId = BlockManagerId("test-client", "test-client", 1)
+doReturn(localBmId).when(blockManager).blockManagerId
+
+val diskBlockManager = mock(classOf[DiskBlockManager])
+doReturn(new 
File("shuffle-read-file")).when(diskBlockManager).getFile(any(classOf[String]))
+doReturn(diskBlockManager).when(blockManager).diskBlockManager
+
+val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+val remoteBlocks = Map[BlockId, ManagedBuffer](
+  ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer(),
+  ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer(),
+  ShuffleBlockId(0, 2, 0) -> createMockManagedBuffer())
+val transfer = mock(classOf[BlockTransferService])
+var shuffleFilesOpt: Option[Array[File]] = None
+when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
+  .thenAnswer(new Answer[Unit] {
+override def answer(invocation: InvocationOnMock): Unit = {
+  val listener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+  shuffleFilesOpt = 
invocation.getArguments()(5).asInstanceOf[Option[Array[File]]]
+  Future {
+listener.onBlockFetchSuccess(
+  ShuffleBlockId(0, 0, 0).toString, 
remoteBlocks(ShuffleBlockId(0, 0, 0)))
+listener.onBlockFetchSuccess(
+  ShuffleBlockId(0, 1, 0).toString, 
remoteBlocks(ShuffleBlockId(0, 1, 0)))
+listener.onBlockFetchSuccess(
+  ShuffleBlockId(0, 2, 0).toString, 
remoteBlocks(ShuffleBlockId(0, 2, 0)))
+  }
+}
+  })
+val taskMemoryManager = mock(classOf[TaskMemoryManager])
+when(taskMemoryManager.acquireExecutionMemory(any(), any()))
+  .thenAnswer(new Answer[Long] {
+override def answer(invocationOnMock: InvocationOnMock): Long = 
500L
+  })
+
+val tc = new TaskContextImpl(0, 0, 0, 0, taskMemoryManager, new 
Properties, null)
+
+val blocksByAddress1 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
+  (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 
100.asInstanceOf[Long])).toSeq))
+
+val iterator1 = new ShuffleBlockFetcherIterator(
+  tc,
+  transfer,
+  blockManager,
+  blocksByAddress1,
+  (_, in) => in,
+  48 * 1024 * 1024,
+  Int.MaxValue,
+  true,
+  taskMemoryManager)
+
+assert(shuffleFilesOpt.isEmpty)
+val blocksByAddress2 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
+  (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 
200.asInstanceOf[Long])).toSeq)
--- End diff --

oh...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114965950
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ---
@@ -401,4 +424,74 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 assert(id3 === ShuffleBlockId(0, 2, 0))
   }
 
+  test("Blocks should be shuffled to disk when size is above the 
threshold, otherwise to memory.") {
+val blockManager = mock(classOf[BlockManager])
+val localBmId = BlockManagerId("test-client", "test-client", 1)
+doReturn(localBmId).when(blockManager).blockManagerId
+
+val diskBlockManager = mock(classOf[DiskBlockManager])
+doReturn(new 
File("shuffle-read-file")).when(diskBlockManager).getFile(any(classOf[String]))
+doReturn(diskBlockManager).when(blockManager).diskBlockManager
+
+val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+val remoteBlocks = Map[BlockId, ManagedBuffer](
+  ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer(),
+  ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer(),
+  ShuffleBlockId(0, 2, 0) -> createMockManagedBuffer())
+val transfer = mock(classOf[BlockTransferService])
+var shuffleFilesOpt: Option[Array[File]] = None
+when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
+  .thenAnswer(new Answer[Unit] {
+override def answer(invocation: InvocationOnMock): Unit = {
+  val listener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+  shuffleFilesOpt = 
invocation.getArguments()(5).asInstanceOf[Option[Array[File]]]
+  Future {
+listener.onBlockFetchSuccess(
+  ShuffleBlockId(0, 0, 0).toString, 
remoteBlocks(ShuffleBlockId(0, 0, 0)))
+listener.onBlockFetchSuccess(
+  ShuffleBlockId(0, 1, 0).toString, 
remoteBlocks(ShuffleBlockId(0, 1, 0)))
+listener.onBlockFetchSuccess(
+  ShuffleBlockId(0, 2, 0).toString, 
remoteBlocks(ShuffleBlockId(0, 2, 0)))
+  }
+}
+  })
+val taskMemoryManager = mock(classOf[TaskMemoryManager])
+when(taskMemoryManager.acquireExecutionMemory(any(), any()))
+  .thenAnswer(new Answer[Long] {
+override def answer(invocationOnMock: InvocationOnMock): Long = 
500L
+  })
+
+val tc = new TaskContextImpl(0, 0, 0, 0, taskMemoryManager, new 
Properties, null)
+
+val blocksByAddress1 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
+  (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 
100.asInstanceOf[Long])).toSeq))
--- End diff --

My mistake 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114965477
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala 
---
@@ -126,11 +131,21 @@ class BlockStoreShuffleReaderSuite extends 
SparkFunSuite with LocalSparkContext
 .set("spark.shuffle.compress", "false")
 .set("spark.shuffle.spill.compress", "false"))
 
+val taskMemoryManager = mock(classOf[TaskMemoryManager])
+when(taskMemoryManager.acquireExecutionMemory(any(), any()))
+  .thenAnswer(new Answer[Long] {
+override def answer(invocation: InvocationOnMock): Long = {
+  invocation.getArguments()(0).asInstanceOf[Long]
+}
+  })
+taskMemoryManager
--- End diff --

So sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114965327
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala ---
@@ -128,4 +130,23 @@ class MapStatusSuite extends SparkFunSuite {
 assert(size1 === size2)
 assert(!success)
   }
+
+  test("Blocks which are bigger than 2 * average size should not be 
underestimated.") {
+val sizes = Array.concat(Array.fill[Long](1000)(1L), (1000L to 
2000L).toArray)
+val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes)
+val arrayStream = new ByteArrayOutputStream(102400)
+val objectOutputStream = new ObjectOutputStream(arrayStream)
+assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+
status1.asInstanceOf[HighlyCompressedMapStatus].writeExternal(objectOutputStream)
+objectOutputStream.flush()
+val array = arrayStream.toByteArray
+val objectInput = new ObjectInputStream(new 
ByteArrayInputStream(array))
+val status2 = new HighlyCompressedMapStatus()
+status2.readExternal(objectInput)
--- End diff --

I'm a little bit confused and hesitant here.
We `writeExternal` for serialization and do initialization in 
`readExternal` when deserialize, right?
`objectInput.readObject().asInstanceOf[HighlyCompressedMapStatus]` is not 
ok, I think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114961196
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -175,33 +181,41 @@ final class ShuffleBlockFetcherIterator(
 val sizeMap = req.blocks.map { case (blockId, size) => 
(blockId.toString, size) }.toMap
 val remainingBlocks = new HashSet[String]() ++= sizeMap.keys
 val blockIds = req.blocks.map(_._1.toString)
-
 val address = req.address
-shuffleClient.fetchBlocks(address.host, address.port, 
address.executorId, blockIds.toArray,
-  new BlockFetchingListener {
-override def onBlockFetchSuccess(blockId: String, buf: 
ManagedBuffer): Unit = {
-  // Only add the buffer to results queue if the iterator is not 
zombie,
-  // i.e. cleanup() has not been called yet.
-  ShuffleBlockFetcherIterator.this.synchronized {
-if (!isZombie) {
-  // Increment the ref count because we need to pass this to a 
different thread.
-  // This needs to be released after use.
-  buf.retain()
-  remainingBlocks -= blockId
-  results.put(new SuccessFetchResult(BlockId(blockId), 
address, sizeMap(blockId), buf,
-remainingBlocks.isEmpty))
-  logDebug("remainingBlocks: " + remainingBlocks)
-}
+
+val blockFetchingListener = new BlockFetchingListener {
+  override def onBlockFetchSuccess(blockId: String, buf: 
ManagedBuffer): Unit = {
+// Only add the buffer to results queue if the iterator is not 
zombie,
+// i.e. cleanup() has not been called yet.
+ShuffleBlockFetcherIterator.this.synchronized {
+  if (!isZombie) {
+// Increment the ref count because we need to pass this to a 
different thread.
+// This needs to be released after use.
+buf.retain()
+remainingBlocks -= blockId
+results.put(new SuccessFetchResult(BlockId(blockId), address, 
sizeMap(blockId), buf,
+  remainingBlocks.isEmpty))
+logDebug("remainingBlocks: " + remainingBlocks)
   }
-  logTrace("Got remote block " + blockId + " after " + 
Utils.getUsedTimeMs(startTime))
 }
+logTrace("Got remote block " + blockId + " after " + 
Utils.getUsedTimeMs(startTime))
+  }
 
-override def onBlockFetchFailure(blockId: String, e: Throwable): 
Unit = {
-  logError(s"Failed to get block(s) from 
${req.address.host}:${req.address.port}", e)
-  results.put(new FailureFetchResult(BlockId(blockId), address, e))
-}
+  override def onBlockFetchFailure(blockId: String, e: Throwable): 
Unit = {
+logError(s"Failed to get block(s) from 
${req.address.host}:${req.address.port}", e)
+results.put(new FailureFetchResult(BlockId(blockId), address, e))
   }
-)
+}
+val acquired = acquireMemory(req.size)
+if (acquired < req.size) {
+  freeMemory(acquired)
+  shuffleClient.fetchBlocks(address.host, address.port, 
address.executorId, blockIds.toArray,
+blockFetchingListener,
+Some(blockIds.map(bId => 
blockManager.diskBlockManager.getFile(s"remote-$bId")).toArray))
--- End diff --

Yes, I will refine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114960285
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---
@@ -193,8 +206,18 @@ private[spark] object HighlyCompressedMapStatus {
 } else {
   0
 }
+val hugeBlockSizes = HashMap[Int, Byte]()
--- End diff --

Sure, I will refine this :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114960041
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---
@@ -128,41 +130,52 @@ private[spark] class CompressedMapStatus(
  * @param numNonEmptyBlocks the number of non-empty blocks
  * @param emptyBlocks a bitmap tracking which blocks are empty
  * @param avgSize average size of the non-empty blocks
+ * @param hugeBlockSizesArray sizes of huge blocks by their reduceId.
  */
 private[spark] class HighlyCompressedMapStatus private (
 private[this] var loc: BlockManagerId,
 private[this] var numNonEmptyBlocks: Int,
 private[this] var emptyBlocks: RoaringBitmap,
-private[this] var avgSize: Long)
+private[this] var avgSize: Long,
+private[this] var hugeBlockSizesArray: Array[Tuple2[Int, Byte]])
   extends MapStatus with Externalizable {
 
+  @transient var hugeBlockSizes: Map[Int, Byte] =
+if (hugeBlockSizesArray == null) null else hugeBlockSizesArray.toMap
+
   // loc could be null when the default constructor is called during 
deserialization
   require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
 "Average size can only be zero for map stages that produced no output")
 
-  protected def this() = this(null, -1, null, -1)  // For deserialization 
only
+  def this() = this(null, -1, null, -1, null)  // For deserialization only
 
   override def location: BlockManagerId = loc
 
   override def getSizeForBlock(reduceId: Int): Long = {
 if (emptyBlocks.contains(reduceId)) {
   0
 } else {
-  avgSize
+  hugeBlockSizes.get(reduceId) match {
+case Some(size) => MapStatus.decompressSize(size)
+case None => avgSize
+  }
 }
   }
 
   override def writeExternal(out: ObjectOutput): Unit = 
Utils.tryOrIOException {
 loc.writeExternal(out)
 emptyBlocks.writeExternal(out)
 out.writeLong(avgSize)
+out.writeObject(hugeBlockSizesArray)
   }
 
   override def readExternal(in: ObjectInput): Unit = 
Utils.tryOrIOException {
 loc = BlockManagerId(in)
 emptyBlocks = new RoaringBitmap()
 emptyBlocks.readExternal(in)
 avgSize = in.readLong()
+hugeBlockSizesArray = in.readObject().asInstanceOf[Array[Tuple2[Int, 
Byte]]]
+hugeBlockSizes = hugeBlockSizesArray.toMap
--- End diff --

After the constructor is called, both `hugeBlockSizes` and 
`hugeBlockSizesArray` are initialized to be null, I think we need to initialize 
both in `readExternal` when deserialize.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114959211
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 ---
@@ -126,4 +151,39 @@ private void failRemainingBlocks(String[] 
failedBlockIds, Throwable e) {
   }
 }
   }
+
+  private class DownloadCallback implements StreamCallback {
+
+private WritableByteChannel channel = null;
+private File targetFile = null;
+private int chunkIndex;
+
+public DownloadCallback(File targetFile, int chunkIndex) throws 
IOException {
+  this.targetFile = targetFile;
+  this.channel = Channels.newChannel(new FileOutputStream(targetFile));
+  this.chunkIndex = chunkIndex;
+}
+
+@Override
+public void onData(String streamId, ByteBuffer buf) throws IOException 
{
+  channel.write(buf);
+}
+
+@Override
+public void onComplete(String streamId) throws IOException {
+  channel.close();
+  ManagedBuffer buffer = new FileSegmentManagedBuffer(
--- End diff --

In current implementation, the files will be removed when app is stopped. 
All the local dirs will be removed. I tested this on my cluster :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114943530
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 ---
@@ -100,7 +114,14 @@ public void onSuccess(ByteBuffer response) {
   // Immediately request all chunks -- we expect that the total 
size of the request is
   // reasonable due to higher level chunking in 
[[ShuffleBlockFetcherIterator]].
   for (int i = 0; i < streamHandle.numChunks; i++) {
-client.fetchChunk(streamHandle.streamId, i, chunkCallback);
+if (fetchToDisk) {
+  final File targetFile = new File(".",
--- End diff --

@cloud-fan 
I understood ~
I will refine, I will replace `boolean fetchToDisk` with `Option<File[]> 
shuffleFilesOpt`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...

2017-05-05 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16989
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-05-03 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114696480
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 ---
@@ -100,7 +114,14 @@ public void onSuccess(ByteBuffer response) {
   // Immediately request all chunks -- we expect that the total 
size of the request is
   // reasonable due to higher level chunking in 
[[ShuffleBlockFetcherIterator]].
   for (int i = 0; i < streamHandle.numChunks; i++) {
-client.fetchChunk(streamHandle.streamId, i, chunkCallback);
+if (fetchToDisk) {
+  final File targetFile = new File(".",
--- End diff --

@cloud-fan 
Yes, but `OneForOneBlockFetcher` is in `network-shuffle` package, I find it 
hard to import `SparkEnv` from `core` package. Did I miss something?(I'm sorry 
if this question is stupid.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17744: [SPARK-20426] Lazy initialization of FileSegmentManagedB...

2017-05-03 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17744
  
Thanks again for help review this pr. Currently I'm not seeing memory issue 
on my nodemanagers. I'd report to community if there's new finding :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17744: [SPARK-20426] Lazy initialization of FileSegmentManagedB...

2017-05-03 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17744
  
@tgravescs 
Thanks a lot for merging.
I proposed to resolve this by "Lazy initialization of 
FileSegmentManagedBuffer" and simplify the change. But after checking the code, 
could we remove `OpenBlocks`. In my understanding, `OpenBlocks` is just a 
handshake before fetching remote blocks and could be removed conceptually(did I 
miss something?). This is just a brainstorming and will be a bigger change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [WIP][SPARK-19659] Fetch big blocks to disk when ...

2017-05-03 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114504511
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
@@ -42,6 +46,12 @@ private[spark] class BlockStoreShuffleReader[K, C](
 
   /** Read the combined key-values for this reduce task */
   override def read(): Iterator[Product2[K, C]] = {
+val memMode =
--- End diff --

Yes, ideally this should be moved into `ShuffleBlockFetcherIterator`, but I 
didn't find a better implementation other than 
```
  extends MemoryConsumer(tmm, tmm.pageSizeBytes(),
if (SparkTransportConf.fromSparkConf(SparkEnv.get.conf, 
"shuffle").preferDirectBufs()) {
  MemoryMode.OFF_HEAP
} else {
  MemoryMode.ON_HEAP
}
  )
```
And I'd be a little bit hesitant to expose a 'setMode' in `MemoryConsumer`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [WIP][SPARK-19659] Fetch big blocks to disk when ...

2017-05-03 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114503627
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---
@@ -133,36 +135,53 @@ private[spark] class HighlyCompressedMapStatus 
private (
 private[this] var loc: BlockManagerId,
 private[this] var numNonEmptyBlocks: Int,
 private[this] var emptyBlocks: RoaringBitmap,
-private[this] var avgSize: Long)
+private[this] var avgSize: Long,
+private[this] var hugeBlockSizes: HashMap[Int, Byte])
--- End diff --

Yes, I will refine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [WIP][SPARK-19659] Fetch big blocks to disk when ...

2017-05-03 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114503557
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 ---
@@ -126,4 +147,38 @@ private void failRemainingBlocks(String[] 
failedBlockIds, Throwable e) {
   }
 }
   }
+
+  private class DownloadCallback implements StreamCallback {
+
+private WritableByteChannel channel = null;
+private File targetFile = null;
+private int chunkIndex;
+
+public DownloadCallback(File targetFile, int chunkIndex) throws 
IOException {
+  this.targetFile = targetFile;
+  this.channel = Channels.newChannel(new FileOutputStream(targetFile));
+  this.chunkIndex = chunkIndex;
+}
+
+@Override
+public void onData(String streamId, ByteBuffer buf) throws IOException 
{
+  channel.write(buf);
+}
+
+@Override
+public void onComplete(String streamId) throws IOException {
+  channel.close();
+  ManagedBuffer buffer = new FileSegmentManagedBuffer(
+transportConf, targetFile, 0, targetFile.length());
+  listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
+}
+
+@Override
+public void onFailure(String streamId, Throwable cause) throws 
IOException {
--- End diff --

Yes, that will be good !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [WIP][SPARK-19659] Fetch big blocks to disk when ...

2017-05-03 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r114503489
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 ---
@@ -100,7 +114,14 @@ public void onSuccess(ByteBuffer response) {
   // Immediately request all chunks -- we expect that the total 
size of the request is
   // reasonable due to higher level chunking in 
[[ShuffleBlockFetcherIterator]].
   for (int i = 0; i < streamHandle.numChunks; i++) {
-client.fetchChunk(streamHandle.streamId, i, chunkCallback);
+if (fetchToDisk) {
+  final File targetFile = new File(".",
--- End diff --

Yes, I wanted to use `DiskBlockManager.getFile`, but I found it's hard to 
import `DiskBlockManager` from `OneForOneBlockFetcher`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17634: [SPARK-20333] HashPartitioner should be compatible with ...

2017-04-26 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17634
  
@kayousterhout @mridulm
Does this pr make sense? 
Could you please take a look this when you have time :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17744: [SPARK-20426] Lazy initialization of FileSegmentM...

2017-04-25 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17744#discussion_r113356306
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
 ---
@@ -93,14 +92,25 @@ protected void handleMessage(
 OpenBlocks msg = (OpenBlocks) msgObj;
 checkAuth(client, msg.appId);
 
-List blocks = Lists.newArrayList();
-long totalBlockSize = 0;
-for (String blockId : msg.blockIds) {
-  final ManagedBuffer block = blockManager.getBlockData(msg.appId, 
msg.execId, blockId);
-  totalBlockSize += block != null ? block.size() : 0;
-  blocks.add(block);
-}
-long streamId = streamManager.registerStream(client.getClientId(), 
blocks.iterator());
+Iterator iter = new Iterator() {
+  private int index = 0;
+
+  @Override
+  public boolean hasNext() {
+return index < msg.blockIds.length;
+  }
+
+  @Override
+  public ManagedBuffer next() {
+final ManagedBuffer block = 
blockManager.getBlockData(msg.appId, msg.execId,
+  msg.blockIds[index]);
--- End diff --

@tgravescs 
Thanks a lot for taking time looking into this :)
In my understanding, the `OpenBlocks` will be kept in heap after 
initialization(https://github.com/apache/spark/blob/master/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java#L84).
Yes, `TransportRequestHandler.processRpcRequest` will release the 
`ByteBuf`, but the `OpenBlocks` will not be released.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17744: [SPARK-20426] Lazy initialization of FileSegmentManagedB...

2017-04-24 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17744
  
Spark jobs are running on yarn cluster in my warehouse. We enabled the 
external shuffle service(--conf spark.shuffle.service.enabled=true). Recently 
NodeManager runs OOM now and then. Dumping heap memory, we find that 
OneFroOneStreamManager's footprint is huge. NodeManager is configured with 5G 
heap memory. While OneForOneManager costs 2.5G and there are 5503233 
FileSegmentManagedBuffer objects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17744: [SPARK-20426] Lazy initialization of FileSegmentM...

2017-04-24 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/17744

[SPARK-20426] Lazy initialization of FileSegmentManagedBuffer for shuffle 
service.

## What changes were proposed in this pull request?
When application contains large amount of shuffle blocks. NodeManager 
requires lots of memory to keep metadata(`FileSegmentManagedBuffer`) in 
`StreamManager`. When the number of shuffle blocks is big enough. NodeManager 
can run OOM. This pr proposes to do lazy initialization of 
`FileSegmentManagedBuffer` in shuffle service.

## How was this patch tested?

Manually test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-20426

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17744.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17744


commit 5fb91bfc5cdd588ae728a39173521279f517f20e
Author: jinxing <jinxing6...@126.com>
Date:   2017-04-24T12:52:00Z

[SPARK-20426] Lazy initialization of FileSegmentManagedBuffer for shuffle 
service.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [WIP][SPARK-19659] Fetch big blocks to disk when ...

2017-04-17 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16989#discussion_r111734780
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---
@@ -133,36 +135,53 @@ private[spark] class HighlyCompressedMapStatus 
private (
 private[this] var loc: BlockManagerId,
 private[this] var numNonEmptyBlocks: Int,
 private[this] var emptyBlocks: RoaringBitmap,
-private[this] var avgSize: Long)
+private[this] var avgSize: Long,
+private[this] var hugeBlockSizes: HashMap[Int, Byte])
   extends MapStatus with Externalizable {
 
   // loc could be null when the default constructor is called during 
deserialization
   require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
 "Average size can only be zero for map stages that produced no output")
 
-  protected def this() = this(null, -1, null, -1)  // For deserialization 
only
+  def this() = this(null, -1, null, -1, null)  // For deserialization only
--- End diff --

Remove the `protected` and make this visible for test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16989: [WIP][SPARK-19659] Fetch big blocks to disk when shuffle...

2017-04-17 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16989
  
Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17533: [WIP][SPARK-20219] Schedule tasks based on size of input...

2017-04-14 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17533
  
I think the failed unit test can be fixed in 
https://github.com/apache/spark/pull/17634 and 
https://github.com/apache/spark/pull/17603


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17603: [SPARK-20288] Avoid generating the MapStatus by stageId ...

2017-04-14 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17603
  
@squito 
Could you help comment on this ? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17634: [SPARK-20333] HashPartitioner should be compatible with ...

2017-04-14 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17634
  
@squito @srowen 
Could you help comment on this :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17533: [WIP][SPARK-20219] Schedule tasks based on size of input...

2017-04-14 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17533
  
@squito 
Thank you so much for reviewing thus far and sorry for the complexity I 
bring in.
I tried to simplify the code according to your comment and please take 
another look when tests passed. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...

2017-04-14 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17533#discussion_r111546462
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -168,6 +169,8 @@ private[spark] class TaskSetManager(
 t.epoch = epoch
   }
 
+  private val sortedPendingTasks = new AtomicBoolean(false)
--- End diff --

Yes, in current change, I do the ordering when create `TaskSet`, there is 
no change in `TSM` now. Thanks a lot for suggestion :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...

2017-04-14 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17533#discussion_r111545406
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1080,6 +1122,25 @@ class DAGScheduler(
 }
   }
 
+  // Visible for testing.
+  private[spark] def getTaskInputSizesFromShuffledRDD(tasks: 
Seq[Task[_]]): Map[Task[_], Long] = {
+val taskInputSizeFromShuffledRDD = HashMap[Task[_], Long]()
+tasks.foreach {
+  case task =>
+val size =
+  parentSplitsInShuffledRDD(task.stageId, task.partitionId).map {
+case parentSplits =>
+  parentSplits.map {
+case (shuffleId, splits) =>
+  
splits.map(mapOutputTracker.getMapSizesByExecutorId(shuffleId, _)
+.flatMap(_._2.map(_._2)).sum).sum
--- End diff --

>It also occurs to me that in general this could use the total input size 
for the task, but I guess spark isn't looking at that in general yet (though it 
probably could, from hadoop's InputSplit.getLength()). Just something to keep 
in mind.

Agree :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...

2017-04-14 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17533#discussion_r111545327
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1080,6 +1122,25 @@ class DAGScheduler(
 }
   }
 
+  // Visible for testing.
+  private[spark] def getTaskInputSizesFromShuffledRDD(tasks: 
Seq[Task[_]]): Map[Task[_], Long] = {
--- End diff --

Yes, should be refined. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...

2017-04-14 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17533#discussion_r111545285
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -472,6 +472,47 @@ class DAGScheduler(
   }
 
   /**
+   * Get ancestor splits in ShuffledRDD.
+   */
+  private[spark] def parentSplitsInShuffledRDD(stageId: Int, pId: Int): 
Option[Map[Int, Set[Int]]] =
+  {
+stageIdToStage.get(stageId) match {
+  case Some(stage) =>
+val waitingForVisit = new Stack[Tuple2[RDD[_], Int]]
+waitingForVisit.push((stage.rdd, pId))
+val ret = new HashMap[Int, HashSet[Int]]()
+while(waitingForVisit.nonEmpty) {
+  val (rdd, split) = waitingForVisit.pop()
+  if (getCacheLocs(rdd)(split) == Nil) {
+rdd.dependencies.foreach {
+  case dep: ShuffleDependency[_, _, _] =>
+val noPartitionerConflict = rdd.partitioner match {
+  case Some(partitioner) =>
+partitioner.isInstanceOf[HashPartitioner] &&
+dep.partitioner.isInstanceOf[HashPartitioner] &&
+partitioner.numPartitions == 
dep.partitioner.numPartitions
--- End diff --

Yes, I always think rdd.partitioner should be the same with shuffle 
dependencies partitioner. But I found `CustomShuffledRDD` is a different one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...

2017-04-14 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17533#discussion_r111545019
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -472,6 +472,47 @@ class DAGScheduler(
   }
 
   /**
+   * Get ancestor splits in ShuffledRDD.
+   */
+  private[spark] def parentSplitsInShuffledRDD(stageId: Int, pId: Int): 
Option[Map[Int, Set[Int]]] =
--- End diff --

Yes, this is confusing and I need to refine this.
I'm a little bit hesitant to use `getShuffleDependencies`. I need to get 
the total size of input from `ShuffledRDD` for every child's partition. After 
transformations like `CoalescedRDD`, there may be not a consistent one-to-one 
match between ancestor's partition index and child's partition index. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17634: [SPARK-20333] HashPartitioner should be compatible with ...

2017-04-13 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17634
  
I found this when doing https://github.com/apache/spark/pull/17533


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17634: [SPARK-20333] HashPartitioner should be compatibl...

2017-04-13 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/17634

[SPARK-20333] HashPartitioner should be compatible with num of child RDD's 
partitions.

## What changes were proposed in this pull request?

Fix test "don't submit stage until its dependencies map outputs are 
registered (SPARK-5259)" in DAGSchedulerSuite.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-20333

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17634.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17634


commit e9d82977b1bee6aa0573bd745985e97a81b8e514
Author: jinxing <jinxing6...@126.com>
Date:   2017-04-14T04:00:31Z

HashPartitioner should be compatible with num of child RDD's partitions.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17603: [SPARK-20288] Avoid generating the MapStatus by stageId ...

2017-04-11 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17603
  
I found this when test https://github.com/apache/spark/pull/17533. It 
failed now and then when try to get size of reduce from `MapStatus`.

I'm not sure how to make it better: 
Modify the test as this pr 
or
Change 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L380


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17603: [SPARK-20288] Avoid generating the MapStatus by s...

2017-04-11 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/17603

[SPARK-20288] Avoid generating the MapStatus by stageId in 
BasicSchedulerIntegrationSuite

## What changes were proposed in this pull request?

ShuffleId is determined before job submitted. But it's hard to predict 
stageId by shuffleId.
Stage is created in DAGScheduler(

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L381),
 but the order is n
ot determined.
I added a log(println(s"Creating ShufflMapStage-$id on 
shuffle-${shuffleDep.shuffleId}")) after 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L331),
 when testing BasicSchedulerIntegrationSuite:"multi-stage job". It will print:
Creating ShufflMapStage-0 on shuffle-0
Creating ShufflMapStage-1 on shuffle-2
Creating ShufflMapStage-2 on shuffle-1
Creating ShufflMapStage-3 on shuffle-3
or
Creating ShufflMapStage-0 on shuffle-1
Creating ShufflMapStage-1 on shuffle-3
Creating ShufflMapStage-2 on shuffle-0
Creating ShufflMapStage-3 on shuffle-2
It might be better to avoid generating the MapStatus by stageId.




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-20288

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17603.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17603


commit ea883f5d2b4bd63fae1f46bfe6ba78b352a894dc
Author: jinxing <jinxing6...@126.com>
Date:   2017-04-11T06:16:31Z

[SPARK-20288] Avoid generating the MapStatus by stageId in 
BasicSchedulerIntegrationSuite




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17533: [SPARK-20219] Schedule tasks based on size of input from...

2017-04-09 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17533
  
@squito
Thank you so much for taking look into this.
> we don't want the TSM requesting info from the DAGSCheduler

Sorry I missed this point for the previous change. Now I push the info(size 
of input from ShuffledRDD) when create TSM.
Also I added a test in `DAGSchedulerSuite` to check the sizes are getting 
computed correctly.
Thanks a lot again :) and hope I understand your comment correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17533: [SPARK-20219] Schedule tasks based on size of input from...

2017-04-07 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17533
  
@kayousterhout 
Thanks a lot for comment and sorry for late reply. I replied your comment 
from JIRA. Please take a look when you have time :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17533: [SPARK-20219] Schedule tasks based on size of inp...

2017-04-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17533#discussion_r109930532
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -512,6 +522,57 @@ private[spark] class TaskSetManager(
 }
   }
 
+  private[this] def sortPendingTasks(): Unit = {
+val taskIndexs = (0 until numTasks).toArray
+implicit def ord = new Ordering[Int] {
+  override def compare(x: Int, y: Int): Int =
+getTaskInputSizeFromShuffledRDD(tasks(x)) compare
+  getTaskInputSizeFromShuffledRDD(tasks(y))
+}
+if (tasks.nonEmpty) {
+  // Sort the tasks based on their input size from ShuffledRDD.
+  pendingTasksForExecutor.foreach {
+case (k, v) => pendingTasksForExecutor(k) = v.sorted
+  }
+  pendingTasksForHost.foreach {
+case (k, v) => pendingTasksForHost(k) = v.sorted
+  }
+  pendingTasksForRack.foreach {
+case (k, v) => pendingTasksForRack(k) = v.sorted
+  }
+  pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.sorted
+  allPendingTasks = allPendingTasks.sorted
+}
+  }
+
+  // Visible for testing
+  private[spark] def setTaskInputSizeFromShuffledRDD(inputSize: 
Map[Task[_], Long]) = {
+taskInputSizeFromShuffledRDD.clear()
+inputSize.foreach{
+  case (k, v) => taskInputSizeFromShuffledRDD(k) = v
+}
+  }
+
+  private[this] def getTaskInputSizeFromShuffledRDD(task: Task[_]): Long = 
{
+taskInputSizeFromShuffledRDD.get(task) match {
+  case Some(size) => size
+  case None =>
+val size =
+  sched.dagScheduler.parentSplitsInShuffledRDD(task.stageId, 
task.partitionId) match {
--- End diff --

I got it :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...

2017-04-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17533#discussion_r109901087
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -512,6 +522,57 @@ private[spark] class TaskSetManager(
 }
   }
 
+  private[this] def sortPendingTasks(): Unit = {
+val taskIndexs = (0 until numTasks).toArray
+implicit def ord = new Ordering[Int] {
+  override def compare(x: Int, y: Int): Int =
+getTaskInputSizeFromShuffledRDD(tasks(x)) compare
+  getTaskInputSizeFromShuffledRDD(tasks(y))
+}
+if (tasks.nonEmpty) {
+  // Sort the tasks based on their input size from ShuffledRDD.
+  pendingTasksForExecutor.foreach {
+case (k, v) => pendingTasksForExecutor(k) = v.sorted
+  }
+  pendingTasksForHost.foreach {
+case (k, v) => pendingTasksForHost(k) = v.sorted
+  }
+  pendingTasksForRack.foreach {
+case (k, v) => pendingTasksForRack(k) = v.sorted
+  }
+  pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.sorted
+  allPendingTasks = allPendingTasks.sorted
+}
+  }
+
+  // Visible for testing
+  private[spark] def setTaskInputSizeFromShuffledRDD(inputSize: 
Map[Task[_], Long]) = {
+taskInputSizeFromShuffledRDD.clear()
+inputSize.foreach{
+  case (k, v) => taskInputSizeFromShuffledRDD(k) = v
+}
+  }
+
+  private[this] def getTaskInputSizeFromShuffledRDD(task: Task[_]): Long = 
{
+taskInputSizeFromShuffledRDD.get(task) match {
+  case Some(size) => size
+  case None =>
+val size =
+  sched.dagScheduler.parentSplitsInShuffledRDD(task.stageId, 
task.partitionId) match {
--- End diff --

Yes, this should be made clearer. But sorry, I didn't find a function like 
`getOrElse(func, 0L)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...

2017-04-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17533#discussion_r109900096
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -438,6 +443,11 @@ private[spark] class TaskSetManager(
 blacklist.isExecutorBlacklistedForTaskSet(execId)
 }
 if (!isZombie && !offerBlacklisted) {
+  if (!sortedPendingTasks.get()) {
--- End diff --

Yes, I should refine :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...

2017-04-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17533#discussion_r109900019
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -512,6 +522,57 @@ private[spark] class TaskSetManager(
 }
   }
 
+  private[this] def sortPendingTasks(): Unit = {
+val taskIndexs = (0 until numTasks).toArray
+implicit def ord = new Ordering[Int] {
--- End diff --

Yes, I think so:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...

2017-04-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17533#discussion_r109896244
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -512,6 +522,57 @@ private[spark] class TaskSetManager(
 }
   }
 
+  private[this] def sortPendingTasks(): Unit = {
+val taskIndexs = (0 until numTasks).toArray
+implicit def ord = new Ordering[Int] {
+  override def compare(x: Int, y: Int): Int =
+getTaskInputSizeFromShuffledRDD(tasks(x)) compare
+  getTaskInputSizeFromShuffledRDD(tasks(y))
+}
+if (tasks.nonEmpty) {
+  // Sort the tasks based on their input size from ShuffledRDD.
+  pendingTasksForExecutor.foreach {
+case (k, v) => pendingTasksForExecutor(k) = v.sorted
+  }
+  pendingTasksForHost.foreach {
+case (k, v) => pendingTasksForHost(k) = v.sorted
+  }
+  pendingTasksForRack.foreach {
+case (k, v) => pendingTasksForRack(k) = v.sorted
+  }
+  pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.sorted
+  allPendingTasks = allPendingTasks.sorted
+}
+  }
+
+  // Visible for testing
+  private[spark] def setTaskInputSizeFromShuffledRDD(inputSize: 
Map[Task[_], Long]) = {
+taskInputSizeFromShuffledRDD.clear()
+inputSize.foreach{
--- End diff --

Yes, it should be refined :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...

2017-04-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17533#discussion_r109893630
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -168,6 +169,10 @@ private[spark] class TaskSetManager(
 t.epoch = epoch
   }
 
+  val sortedPendingTasks = new AtomicBoolean(false)
--- End diff --

Yes, it should be.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17533: [WIP][SPARK-20219] Schedule tasks based on size o...

2017-04-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17533#discussion_r109877754
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -138,7 +139,7 @@ private[spark] class TaskSetManager(
   private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
 
   // Set containing all pending tasks (also used as a stack, as above).
-  private val allPendingTasks = new ArrayBuffer[Int]
+  private var allPendingTasks = new ArrayBuffer[Int]
--- End diff --

I make this `var`, because I don't have a better approach to sort 
ArrayBuffer in place. Advice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17533: [WIP][SPARK-20219] Schedule tasks based on size of input...

2017-04-05 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17533
  
Yes, I did the test in my cluster. In highly-skew stage, the time cost can 
be reduced significantly. Tasks are scheduled with locality preference. But in 
current code, input size of tasks are not taken into consideration. Think about 
this scenario:
1. There are 9 partitions(0~8) in the ShuffledRDD and size of partition-8 
is 8 times of the previous 8 partitions. (Lets assume that time cost of task 
has linear relation with the size of input and time cost of first 8 tasks is 1 
and the time cost of the last task is 8.)
2. Tasks are scheduled on 2 executors.

In current code, the tasks are scheduled in serial order and task for 
partition-8 will be the last one to launch and the time cost is 12.
With this change, task for partition-8 will be scheduled first and the time 
cost will be reduced to 8.

This change is related to SPARK-19100. In my prod env, skew situations 
happens mostly on ShuffledRDD. Thus this pr proposes to consider the size of 
input from ShuffledRDD when scheduling. This change can bring benefit when skew 
situations and won't have negative impact on performance in other scenarios.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17533: [SPARK-20219] Schedule tasks based on size of inp...

2017-04-04 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/17533

[SPARK-20219] Schedule tasks based on size of input from ScheduledRDD

## What changes were proposed in this pull request?

When data is highly skewed on `ShuffledRDD`, it make sense to launch those 
tasks which process much more input as soon as possible. The current scheduling 
mechanism in `TaskSetManager` is quite simple:
```
for (i <- (0 until numTasks).reverse) {
addPendingTask(i)
}
```
In scenario that "large tasks" locate at bottom half of tasks array, if 
tasks with much more input are launched early, we can significantly reduce the 
time cost and save resource when "dynamic allocation" is disabled.

## How was this patch tested?

Added unit test in 'TaskSetManagerSuite'.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-20219

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17533.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17533


commit bca146c531060f32fa351397e13f0f778fe35b25
Author: jinxing <jinxing6...@126.com>
Date:   2017-04-04T15:25:19Z

Sort tasks based on their size.

commit f757e4125935f7237a29bb07313ffb46ccbb3cd0
Author: jinxing <jinxing6...@126.com>
Date:   2017-04-05T03:06:41Z

Add unit test.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17112: [WIP] Measurement for SPARK-16929.

2017-04-03 Thread jinxing64
Github user jinxing64 closed the pull request at:

https://github.com/apache/spark/pull/17112


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17276: [SPARK-19937] Collect metrics of block sizes when shuffl...

2017-04-01 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17276
  
@mridulm 
Sorry for late reply.  I opened the pr for 
SPARK-19659(https://github.com/apache/spark/pull/16989) and make these two PRs 
independent. Basically this pr is is to evaluate the performance(blocks are 
shuffled to disk) and stability(size in `MapStatus` is inaccurate and OOM can 
happen) of the implementation proposed in SPARK-19659.
I'd be so thankful if you have time to  comment on these two PRs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17276: [SPARK-19937] Collect metrics of block sizes when...

2017-03-26 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17276#discussion_r108061417
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 ---
@@ -169,6 +173,36 @@ public void write(Iterator<Product2<K, V>> records) 
throws IOException {
   }
 }
 mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), 
partitionLengths);
+if (mapStatus instanceof HighlyCompressedMapStatus) {
+  HighlyCompressedMapStatus hc = (HighlyCompressedMapStatus) mapStatus;
+  long underestimatedBlocksSize = 0L;
+  for (int i = 0; i < partitionLengths.length; i++) {
+if (partitionLengths[i] > mapStatus.getSizeForBlock(i)) {
+  underestimatedBlocksSize += partitionLengths[i];
+}
+  }
+  writeMetrics.incUnderestimatedBlocksSize(underestimatedBlocksSize);
+  if (logger.isDebugEnabled() && partitionLengths.length > 0) {
+int underestimatedBlocksNum = 0;
+// Distribution of sizes in MapStatus.
+double[] cp = new double[partitionLengths.length];
+for (int i = 0; i < partitionLengths.length; i++) {
+  cp[i] = partitionLengths[i];
+  if (partitionLengths[i] > mapStatus.getSizeForBlock(i)) {
+underestimatedBlocksNum++;
+  }
+}
+Distribution distribution = new Distribution(cp, 0, cp.length);
+double[] probabilities = {0.0, 0.25, 0.5, 0.75, 1.0};
+String distributionStr = 
distribution.getQuantiles(probabilities).mkString(", ");
+logger.debug("For task {}.{} in stage {} (TID {}), the block sizes 
in MapStatus are " +
+  "inaccurate (average is {}, {} blocks underestimated, size of 
underestimated is {})," +
+  " distribution at the given probabilities(0, 0.25, 0.5, 0.75, 
1.0) is {}.",
+  taskContext.partitionId(), taskContext.attemptNumber(), 
taskContext.stageId(),
+  taskContext.taskAttemptId(), hc.getAvgSize(),
+  underestimatedBlocksNum, underestimatedBlocksSize, 
distributionStr);
+  }
+}
--- End diff --

In `CompressedMapStatus`, the blocks sizes are accurate, so I might 
hesitate to add that log.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17276: [SPARK-19937] Collect metrics of block sizes when shuffl...

2017-03-26 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17276
  
@mridulm 
Thanks a lot for taking time looking into this and thanks for comments :)
1) I changed the size of underestimated blocks to be 
`partitionLengths.filter(_ > hc.getAvgSize).map(_ - hc.getAvgSize).sum`
2) I added a method `genBlocksDistributionStr` and call it from 
`ShuffleWriters`, thus avoid duplicate codes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17276: [WIP][SPARK-19937] Collect metrics of block sizes when s...

2017-03-25 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17276
  
@squito
Thanks a lot for taking time looking into this pr.
I updated the pr. Currently just add two metrics: a) the total size of 
underestimated blocks size, b) the size of blocks shuffled to memory.
For a), executor use `maxBytesInFlight` to control the speed of 
shuffle-read. I agree with your comment `another metric that may be nice to 
capture here is maximum underestimate`. But think about this scenario: the 
maximum is small, but thousands of blocks are underestimated, thus 
`maxBytesInFlight` cannot help avoid the OOM during shuffle-read. That's why I 
proposed to track the metrics of total size of underestimated blocks size;
For b), currently all data are shuffled-read to memory. If we add the 
feature of shuffling to disk when memory shortage, we need to evaluate the 
performance. I think another two metrics need to be taken into account: the 
size of blocks shuffled to disk(to be added in another pr) and task's running 
time(already exist). The more data shuffled to memory, the better performance; 
The shorter time cost, the better performance.

I also added some log for debug in `ShuffleWriter`, including the num of 
underestimated blocks and the size distribution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-24 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@kayousterhout @squito @mridulm 
Thanks for reviewing this !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-23 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@kayousterhout 
Thanks a lot for comments. I refined accordingly :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.

2017-03-23 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  
@rxin
Yes, I'm so confused by the second screenshot I posted.
The only reason I can find is that the `stageData` in `ExecutorTable` is 
none thread safe. Size(2 executors) returned; maybe at this point, new executor 
is inserted.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17276: [WIP][SPARK-19937] Collect metrics of block sizes when s...

2017-03-22 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17276
  
You are so kind person. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.

2017-03-22 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  
@rxin because I killed executor1 and it is not active during this stage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17276: [SPARK-19937] Collect metrics of block sizes when shuffl...

2017-03-22 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17276
  
@squito oh, I feel sorry if this is disturbing. I will mark it as wip.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.

2017-03-22 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  
@harounemohammedi 
Thanks a lot for comment on this. I'm hesitate to include the `total time` 
in this pr.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17276: [SPARK-19937] Collect metrics of block sizes when shuffl...

2017-03-21 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17276
  
@squito 
Thanks a lot for your comments and I will think and do the test carefully 
:) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-21 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@kayousterhout more comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16389: [SPARK-18981][Core]The job hang problem when speculation...

2017-03-21 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16389
  
@zhaorongsheng
I think its better to just not reset `numRunningTasks` to 0. If we got some 
`ExecutorLostFailure`,  the stage should not be marked as finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.

2017-03-21 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  
I want to show the number of executors once active during the stage. 
`StageUIData` gets updated when receiving the hear beat from executor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.

2017-03-20 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  
@rxin @jerryshao @srowen 
I've refined the description and uploaded the screenshot of latest version. 
Please take another look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.

2017-03-20 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  
https://cloud.githubusercontent.com/assets/4058918/24134191/8392c5ea-0e3d-11e7-8a53-f164acf04764.png;>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.

2017-03-20 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  
Sure, that would be cool :) Thanks again you can help review this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.

2017-03-20 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  
@rxin @jerryshao 
I uploaded another screenshot and give a short description there.
Now it is (2 executors supplied).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.

2017-03-20 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  


![screenshot2](https://cloud.githubusercontent.com/assets/4058918/24127926/5e0e7294-0e13-11e7-8af0-434b05e2815a.png)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.

2017-03-20 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  
@jerryshao 
Thanks a lot you can help review, really appreciate. I will give a 
description soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-19 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@squito 
Thanks :) already refined.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-19 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@mridulm 
Thanks a lot for helping review this : ) really appreciate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.

2017-03-17 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  
@rxin Thanks a lot. I added a number after `Aggregated Metrics by Executor`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.

2017-03-17 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  

![screenshot](https://cloud.githubusercontent.com/assets/4058918/24069386/0f556622-0be2-11e7-9f48-cc096cdd7d9b.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-17 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@squito 
Sure. I did test for 100k tasks. The results are as below:

|  | time cost |
| --| -- |
| insert | 135ms, 122ms, 119ms, 120ms, 163ms |
| `checkSpeculatableTasks` | 6ms, 6ms, 6ms, 5ms, 6ms |


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17276: [SPARK-19937] Collect metrics of block sizes when shuffl...

2017-03-16 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17276
  
@squito 
Would you mind help comment on this when have time ? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-16 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106453502
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util.Arrays
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.median
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
+
+assert(valid)
+  }
+
+  test("Median should be correct when size of MedianHeap is even") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 10)
+assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
+  }
+
+  test("Median should be correct when size of MedianHeap is odd") {
+val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+assert(medianHeap.size() === 9)
+assert(medianHeap.median === (array(4)))
+  }
+
+  test("Size of Median should be correct though there are duplicated 
numbers inside.") {
+val array = Array(0, 0, 1, 1, 2, 2, 3, 3, 4, 4)
+val medianHeap = new MedianHeap()
+array.foreach(medianHeap.insert(_))
+Arrays.sort(array)
+assert(medianHeap.size === 10)
+assert(medianHeap.median === ((array(4) + array(5)) / 2.0))
+  }
--- End diff --

Yes, I added this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-16 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106453205
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util.Arrays
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.median
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
--- End diff --

Thanks a lot for the recommendation :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-16 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106433273
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -172,7 +172,7 @@ private[spark] class TaskSchedulerImpl 
private[scheduler](
 
 if (!isLocal && conf.getBoolean("spark.speculation", false)) {
   logInfo("Starting speculative execution thread")
-  speculationScheduler.scheduleAtFixedRate(new Runnable {
+  speculationScheduler.scheduleWithFixedDelay(new Runnable {
--- End diff --

@squito 
Thanks a lot for looking into this. I will put the change in another pr : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: [SPARK-19973] Display num of executors for the stage.

2017-03-16 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  
The executor metrics are updated to `StageUIData` when receive executor 
hear beat.
Yes, the longevity of the executor may not cover the whole stage, but it 
was once active during the stage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17312: [SPARK-19973] Display num of executors for the st...

2017-03-16 Thread jinxing64
GitHub user jinxing64 reopened a pull request:

https://github.com/apache/spark/pull/17312

[SPARK-19973] Display num of executors for the stage.

## What changes were proposed in this pull request?

In `StagePage` the total num of executors are not displayed. Since 
executorId may not be consecutive. It is useful to display the total number, 
which is useful when check how many executors are supplied during the stage.

## How was this patch tested?
Manually test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19973

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17312.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17312


commit 7aa6fe53c58601285499fd94d7587df7a5ac7477
Author: jinxing <jinxing6...@126.com>
Date:   2017-03-16T10:09:48Z

[SPARK-19973] Display num of executors for the stage.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17312: [SPARK-19973] Display num of executors for the st...

2017-03-16 Thread jinxing64
Github user jinxing64 closed the pull request at:

https://github.com/apache/spark/pull/17312


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17312: Display num of executors for the stage.

2017-03-16 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17312
  
@srowen 
Thanks a lot for quick reply. When we check the reason why a stage ran 
today much longer than yesterday, we want to know how many executors are 
supplied. We don't want to count the executor one by one. A total number of 
executors make sense here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17312: Display num of executors for the stage.

2017-03-16 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/17312

Display num of executors for the stage.

## What changes were proposed in this pull request?

In `StagePage` the total num of executors are not displayed. Since 
executorId may not be consecutive. It is useful to display the total number, 
which is useful when check how many executors are supplied during the stage.

## How was this patch tested?
Manually test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19973

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17312.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17312


commit f64691c0007408d95803019530c2352cecbf7c9e
Author: jinxing <jinxing6...@126.com>
Date:   2017-03-16T10:09:48Z

Display num of executors for the stage.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-15 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@kayousterhout 
Thanks a lot for the comments :) very helpful. 
I've refined, please take another look when you have time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106340513
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -893,6 +893,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 val taskSet = FakeTask.createTaskSet(4)
 // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
 sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation", "true")
--- End diff --

This should be set. Because the duration is inserted to `MedianHeap` only 
when `spark.speculation`(e.g. If I remove this, `MedianHeap` will be empty when 
call `checkSpeculatableTasks`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-15 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r106340321
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -172,7 +172,7 @@ private[spark] class TaskSchedulerImpl 
private[scheduler](
 
 if (!isLocal && conf.getBoolean("spark.speculation", false)) {
   logInfo("Starting speculative execution thread")
-  speculationScheduler.scheduleAtFixedRate(new Runnable {
+  speculationScheduler.scheduleWithFixedDelay(new Runnable {
--- End diff --

I was thinking `checkSpeculatableTasks` will synchronize 
`TaskSchedulerImpl`. If `checkSpeculatableTasks` doesn't finish with 100ms, 
then the possibility exists for that thread to release and then immediately 
re-acquire the lock. Should this be included in this pr? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-15 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@kayousterhout @mridulm 
More comments on this ? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-13 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@squito 
Thanks a lot for comments. I've refined :):)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17276: [WIP][SPARK-19937] Collect metrics of block sizes...

2017-03-13 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/17276

[WIP][SPARK-19937] Collect metrics of block sizes when shuffle.

## What changes were proposed in this pull request?

Metrics of blocks sizes(when shuffle) should be collected for later 
analysis. This is helpful for analysis when skew situations or OOM 
happens(though maxBytesInFlight is set).

This pr proposes to:
1. Store the distribution of sizes in `MapStatus` and count the block sizes 
in ranges
[0, 1k), [1k, 10k), [10k, 100k), [100k, 1m), [1m, 10m), [10m, 100m), [100m, 
1g), [1g, 10g), [10g, Long.MaxValue).
2. Record the inaccuracy of block sizes. Because 
`HighlyCompressedMapStatus` is returned and only average size is recorded when 
block sizes is over 2000.

## How was this patch tested?




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19937

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17276.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17276


commit 430ec95291393f49096fc07df98c15e700f23e8c
Author: jinxing <jinxing6...@126.com>
Date:   2017-03-13T13:35:51Z

[SPARK-19937] Collect metrics of block sizes when shuffle.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-10 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@squito Sorry, it seems like something went wrong when I did merge and try 
resolve the conflict. I squashed the commits and did rebase. It seems ok now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17133: [SPARK-19793] Use clock.getTimeMillis when mark task as ...

2017-03-09 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17133
  
@vanzin @srowen 
I refined according to the comments, please take a look when you have time 
:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-09 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@kayousterhout @squito @mridulm 
I refined according comments. Please take a look when you have time :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17208: [SPARK-19868] conflict TasksetManager lead to spark stop...

2017-03-09 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17208
  
@squito 
Thanks for notification :) this is not in my pr.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   >