[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-21 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128793623
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -375,6 +390,7 @@ final class ShuffleBlockFetcherIterator(
   result match {
 case r @ SuccessFetchResult(blockId, address, size, buf, 
isNetworkReqDone) =>
   if (address != blockManager.blockManagerId) {
+numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
--- End diff --

@cloud-fan filed a JIRA for this => 
https://issues.apache.org/jira/browse/SPARK-21500


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128668137
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -443,12 +459,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+while (isRemoteBlockFetchable(defReqQueue) &&
+!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
+  val request = defReqQueue.dequeue()
+  logDebug(s"Processing deferred fetch request for $remoteAddress 
with "
++ s"${request.blocks.length} blocks")
+  send(remoteAddress, request)
+  if (defReqQueue.isEmpty) {
+deferredFetchRequests -= remoteAddress
--- End diff --

i see


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128668114
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -443,12 +459,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+while (isRemoteBlockFetchable(defReqQueue) &&
+!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
+  val request = defReqQueue.dequeue()
+  logDebug(s"Processing deferred fetch request for $remoteAddress 
with "
++ s"${request.blocks.length} blocks")
+  send(remoteAddress, request)
+  if (defReqQueue.isEmpty) {
+deferredFetchRequests -= remoteAddress
+  }
+}
+  }
+}
+
+// Process any regular fetch requests if possible.
+while (isRemoteBlockFetchable(fetchRequests)) {
+  val request = fetchRequests.dequeue()
+  val remoteAddress = request.address
+  if (isRemoteAddressMaxedOut(remoteAddress, request)) {
+logDebug(s"Deferring fetch request for $remoteAddress with 
${request.blocks.size} blocks")
+val defReqQueue = deferredFetchRequests.getOrElse(remoteAddress, 
new Queue[FetchRequest]())
+defReqQueue.enqueue(request)
+deferredFetchRequests(remoteAddress) = defReqQueue
--- End diff --

you are right


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128668000
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -375,6 +390,7 @@ final class ShuffleBlockFetcherIterator(
   result match {
 case r @ SuccessFetchResult(blockId, address, size, buf, 
isNetworkReqDone) =>
   if (address != blockManager.blockManagerId) {
+numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
--- End diff --

yea sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128667967
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,17 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+  .doc("This configuration limits the number of remote blocks being 
fetched per reduce task" +
+" from a given host port. When a large number of blocks are being 
requested from a given" +
+" address in a single fetch or simultaneously, this could crash 
the serving executor or" +
+" Node Manager. This is especially useful to reduce the load on 
the Node Manager when" +
--- End diff --

I think Node Manager is for YARN only? Shuffle service is more general


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128572125
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -375,6 +390,7 @@ final class ShuffleBlockFetcherIterator(
   result match {
 case r @ SuccessFetchResult(blockId, address, size, buf, 
isNetworkReqDone) =>
   if (address != blockManager.blockManagerId) {
+numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
--- End diff --

That is a good point. Infact, we could also move the other bookkeeping 
stuff right after the fetch result is enqueued. 

I would also want to look at the initialization of the 
BlockFetchingListener to see the effects of this as it would increase the size 
of the closure. Can we have a separate JIRA filed for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128573651
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,17 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+  .doc("This configuration limits the number of remote blocks being 
fetched per reduce task" +
+" from a given host port. When a large number of blocks are being 
requested from a given" +
+" address in a single fetch or simultaneously, this could crash 
the serving executor or" +
+" Node Manager. This is especially useful to reduce the load on 
the Node Manager when" +
--- End diff --

If the shuffle service fails it can take down the Node Manager which is 
more severe and hence i have used it. And in the following sentence i have 
mentioned the external shuffle. If it is not clear, I am okay to change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128559318
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -443,12 +459,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+while (isRemoteBlockFetchable(defReqQueue) &&
+!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
+  val request = defReqQueue.dequeue()
+  logDebug(s"Processing deferred fetch request for $remoteAddress 
with "
++ s"${request.blocks.length} blocks")
+  send(remoteAddress, request)
+  if (defReqQueue.isEmpty) {
+deferredFetchRequests -= remoteAddress
+  }
+}
+  }
+}
+
+// Process any regular fetch requests if possible.
+while (isRemoteBlockFetchable(fetchRequests)) {
+  val request = fetchRequests.dequeue()
+  val remoteAddress = request.address
+  if (isRemoteAddressMaxedOut(remoteAddress, request)) {
+logDebug(s"Deferring fetch request for $remoteAddress with 
${request.blocks.size} blocks")
+val defReqQueue = deferredFetchRequests.getOrElse(remoteAddress, 
new Queue[FetchRequest]())
+defReqQueue.enqueue(request)
+deferredFetchRequests(remoteAddress) = defReqQueue
--- End diff --

If it is the first time that we want to defer a request, `defReqQueue` has 
to be associated with its corresponding '`remoteAddress`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128557269
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -443,12 +459,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+while (isRemoteBlockFetchable(defReqQueue) &&
+!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
+  val request = defReqQueue.dequeue()
+  logDebug(s"Processing deferred fetch request for $remoteAddress 
with "
++ s"${request.blocks.length} blocks")
+  send(remoteAddress, request)
+  if (defReqQueue.isEmpty) {
+deferredFetchRequests -= remoteAddress
--- End diff --

We would have to unnecessarily iterate through the map for all the block 
manager ids for which we deferred fetch requests at an earlier point to check 
if they have any pending fetch requests when they don't.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128410279
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -443,12 +459,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+while (isRemoteBlockFetchable(defReqQueue) &&
+!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
+  val request = defReqQueue.dequeue()
+  logDebug(s"Processing deferred fetch request for $remoteAddress 
with "
++ s"${request.blocks.length} blocks")
+  send(remoteAddress, request)
+  if (defReqQueue.isEmpty) {
+deferredFetchRequests -= remoteAddress
+  }
+}
+  }
+}
+
+// Process any regular fetch requests if possible.
+while (isRemoteBlockFetchable(fetchRequests)) {
+  val request = fetchRequests.dequeue()
+  val remoteAddress = request.address
+  if (isRemoteAddressMaxedOut(remoteAddress, request)) {
+logDebug(s"Deferring fetch request for $remoteAddress with 
${request.blocks.size} blocks")
+val defReqQueue = deferredFetchRequests.getOrElse(remoteAddress, 
new Queue[FetchRequest]())
+defReqQueue.enqueue(request)
+deferredFetchRequests(remoteAddress) = defReqQueue
--- End diff --

the `defReqQueue` is mutable, so we don't need to do this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128410233
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -443,12 +459,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+while (isRemoteBlockFetchable(defReqQueue) &&
+!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
+  val request = defReqQueue.dequeue()
+  logDebug(s"Processing deferred fetch request for $remoteAddress 
with "
++ s"${request.blocks.length} blocks")
+  send(remoteAddress, request)
+  if (defReqQueue.isEmpty) {
+deferredFetchRequests -= remoteAddress
--- End diff --

we can leave the empty queue here, as we may still have fetch requests to 
put in this queue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128409414
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -375,6 +390,7 @@ final class ShuffleBlockFetcherIterator(
   result match {
 case r @ SuccessFetchResult(blockId, address, size, buf, 
isNetworkReqDone) =>
   if (address != blockManager.blockManagerId) {
+numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
--- End diff --

can we do this earlier? e.g. right after the fetch result is enqueued to 
`results`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128408952
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,17 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+  .doc("This configuration limits the number of remote blocks being 
fetched per reduce task" +
+" from a given host port. When a large number of blocks are being 
requested from a given" +
+" address in a single fetch or simultaneously, this could crash 
the serving executor or" +
+" Node Manager. This is especially useful to reduce the load on 
the Node Manager when" +
--- End diff --

shall we say `shuffle service` instead of `Node Manager`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18487


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-19 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128257328
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,17 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+  .doc("This configuration limits the number of remote blocks being 
fetched per reduce task" +
+" from a given host port. When a large number of blocks are being 
requested from a given" +
+" address in a single fetch or simultaneously, this could crash 
the serving executor or" +
+" Node Manager. This is especially useful to reduce the load on 
the Node Manager when" +
+" external shuffle is enabled. You can mitigate the issue by 
setting it to a lower value.")
+  .intConf
+  .checkValue(_ > 0, "The max no. of blocks in flight cannot be 
non-positive.")
+  .createWithDefault(Int.MaxValue)
--- End diff --

I'm fine leaving it maxvalue for now to not change current behavior just 
like we have done with some of these other related configs.   I would like to 
get more runtime on this in production and then we can set it later.  Perhaps 
in 2.3, it would be nice to pull this back into branch 2.2 as well master.   


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128145633
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,17 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+  .doc("This configuration limits the number of remote blocks being 
fetched per reduce task" +
+" from a given host port. When a large number of blocks are being 
requested from a given" +
+" address in a single fetch or simultaneously, this could crash 
the serving executor or" +
+" Node Manager. This is especially useful to reduce the load on 
the Node Manager when" +
+" external shuffle is enabled. You can mitigate the issue by 
setting it to a lower value.")
+  .intConf
+  .checkValue(_ > 0, "The max no. of blocks in flight cannot be 
non-positive.")
+  .createWithDefault(Int.MaxValue)
--- End diff --

cc @tgravescs shall we change the default value to 20 or something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-18 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r127998722
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -277,11 +290,13 @@ final class ShuffleBlockFetcherIterator(
   } else if (size < 0) {
 throw new BlockException(blockId, "Negative block size " + 
size)
   }
-  if (curRequestSize >= targetRequestSize) {
+  if (curRequestSize >= targetRequestSize ||
+  curBlocks.size >= maxBlocksInFlightPerAddress) {
--- End diff --

We are already doing it here => 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L330


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r127885748
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -277,11 +290,13 @@ final class ShuffleBlockFetcherIterator(
   } else if (size < 0) {
 throw new BlockException(blockId, "Negative block size " + 
size)
   }
-  if (curRequestSize >= targetRequestSize) {
+  if (curRequestSize >= targetRequestSize ||
+  curBlocks.size >= maxBlocksInFlightPerAddress) {
--- End diff --

We may have a lot of adjacent fetch requests in the queue, shall we shuffle 
the request queue before fetching?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-14 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r127490634
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -110,12 +113,21 @@ final class ShuffleBlockFetcherIterator(
*/
   private[this] val fetchRequests = new Queue[FetchRequest]
 
+  /**
+   * Queue of fetch requests which could not be issued the first time they 
were dequed. These
--- End diff --

s/dequed/dequeued/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-14 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r127490147
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,17 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+  .doc("This configuration limits the number of remote blocks being 
fetched per reduce task" +
+" from a given host port. When a large number of blocks are being 
requested from a given" +
+" address in a single fetch or simultaneously, this could crash 
the serving executor or" +
+" Node Manager. This is especially useful to reduce the load on 
the Node Manager when" +
+"external shuffle is enabled. You can mitigate the issue by 
setting it to a lower value.")
--- End diff --

space before external


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-09 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r126305092
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -433,12 +449,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
--- End diff --

ah, make sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-09 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r126303433
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -433,12 +449,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
--- End diff --

Oh yes. That was the first choice and I gave it a try to avoid adding any 
extra bookkeeping. The are issues with that approach. Say you have a request 
which has to be deferred. You just remove it and push at the end and continue. 
* This is good as far as you don't meet the deferred request again. 
* Now if you meet the deferred request again, it may or may not be 
schedulable based on whether the remote finished processing earlier request. 
This would lead going up in circles (wasted effort). To avoid this we have to 
know when to stop.  We would have to keep a marker for request which was 
already deferred so that we know we have to stop. But this marker would be only 
for a single request which corresponds to one remote. In the meanwhile other 
remotes could have finished processing their earlier requests and we can 
schedule requests to them. For this we can no longer stop at the first marker 
for a single address. We would have to check the requests again. 

This makes it more complicated than scheduling all that's possible in a 
single shot and deferring what it encounters on its way. The next time we try 
to clear any backlog from previous run and after doing so proceed normally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-09 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r126299726
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -433,12 +449,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
--- End diff --

I was thinking maybe we can avoid adding extra `deferredFetchRequests` to 
handle deferred fetch requests, instead we can iterator over the 
`fetchRequests` to send the requests that are not maxed out, this way we may 
simplify the logic. Would you like to try?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-07 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r126217282
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -433,12 +449,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
--- End diff --

I didn't get you. We are just iterating to check if there are requests that 
can be scheduled. And they are handled asynchronously by the send calls. What 
effort are you addressing to?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-07 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r126216782
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,16 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+  .doc("This configuration limits the number of remote blocks being 
fetched from a given " +
+" host port at any given point. When external shuffle is enabled 
and a large number of " +
--- End diff --

okay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-07 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r126163029
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,16 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+  .doc("This configuration limits the number of remote blocks being 
fetched from a given " +
+" host port at any given point. When external shuffle is enabled 
and a large number of " +
--- End diff --

At lease we should state the configuration doesn't necessarily go with 
external shuffle service.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-07 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r126166024
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -433,12 +449,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
--- End diff --

If the traffic is heavy, it may takes some time to finish the iterator, do 
you have any idea on reducing the effort?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-06 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r125945263
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,16 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
--- End diff --

@jinxing64 After your fix for lazily loading the open blocks iterator, I am 
not seeing issues with the NM crashing on my end. However, in cases where a 
request was being made with a high no. of blocks which were under the max 
constraints caused increased load. This is an added layer of defense which can 
mitigate the issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-06 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r12599
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,16 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+  .doc("This configuration limits the number of remote blocks being 
fetched from a given " +
+" host port at any given point. When external shuffle is enabled 
and a large number of " +
--- End diff --

In this case it would take down either the NM or the executor serving the 
map output tasks based on the shuffle mode. We emphasize the external shuffle 
case as crashing the NM is more severe than loosing an executor. I am open to 
re-wording it so that it is easier to understand. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-06 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r125943895
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -433,12 +449,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+while (isRemoteBlockFetchable(defReqQueue) &&
+!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
--- End diff --


https://github.com/apache/spark/pull/18487/files/d60a0bef35e1c38108d3b18a40b7dc01ed8b814f#diff-27109eb30a77542d377c936e0d134420R284


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-06 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r125942675
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -433,12 +449,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+while (isRemoteBlockFetchable(defReqQueue) &&
+!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
--- End diff --

We check the no. of blocks being added to a fetch request. If it is larger 
than the configured no. we create a new request. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-06 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r125942326
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,16 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+  .doc("This configuration limits the number of remote blocks being 
fetched from a given " +
+" host port at any given point. When external shuffle is enabled 
and a large number of " +
+" blocks are being requested from a given node in a single fetch 
or simultaneously, this " +
+" could crash the Node Manager under increased load. You can 
mitigate this issue by " +
+" setting it to a lower value.")
+  .intConf
+  .createWithDefault(Int.MaxValue)
--- End diff --

okay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-06 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r125942266
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -433,12 +449,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+while (isRemoteBlockFetchable(defReqQueue) &&
+!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
+  val request = defReqQueue.dequeue()
+  logDebug(s"Processing deferred fetch request for $remoteAddress 
with "
++ s"${request.blocks.length} blocks")
+  send(remoteAddress, request)
+  if (defReqQueue.isEmpty) {
+deferredFetchRequests -= remoteAddress
+  }
+}
+  }
+}
+
+// Process any regular fetch requests if possible.
+while (isRemoteBlockFetchable(fetchRequests)) {
+  val request = fetchRequests.dequeue()
+  val remoteAddress = request.address
+  if (isRemoteAddressMaxedOut(remoteAddress, request)) {
+logDebug(s"Deferring fetch request for $remoteAddress with 
${request.blocks.size} blocks")
+val defReqQueue = deferredFetchRequests.getOrElse(remoteAddress, 
new Queue[FetchRequest]())
+defReqQueue.enqueue(request)
+deferredFetchRequests(remoteAddress) = defReqQueue
+  } else {
+send(remoteAddress, request)
+  }
+}
+
+def send(remoteAddress: BlockManagerId, request: FetchRequest): Unit = 
{
+  sendRequest(request)
+  numBlocksInFlightPerAddress(remoteAddress) =
+numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + 
request.blocks.size
+}
+
+def isRemoteBlockFetchable(fetchReqQueue: Queue[FetchRequest]): 
Boolean = {
+  fetchReqQueue.nonEmpty &&
+(bytesInFlight == 0 ||
+  (reqsInFlight + 1 <= maxReqsInFlight &&
+bytesInFlight + fetchReqQueue.front.size <= maxBytesInFlight))
+}
+
+// Checks if sending a new fetch request will exceed the max no. of 
blocks being fetched from a
+// given remote address.
+def isRemoteAddressMaxedOut(remoteHost: BlockManagerId, request: 
FetchRequest): Boolean = {
--- End diff --

this should be `remoteAddress`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-06 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r125916755
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -433,12 +449,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+while (isRemoteBlockFetchable(defReqQueue) &&
+!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
+  val request = defReqQueue.dequeue()
+  logDebug(s"Processing deferred fetch request for $remoteAddress 
with "
++ s"${request.blocks.length} blocks")
+  send(remoteAddress, request)
+  if (defReqQueue.isEmpty) {
+deferredFetchRequests -= remoteAddress
+  }
+}
+  }
+}
+
+// Process any regular fetch requests if possible.
+while (isRemoteBlockFetchable(fetchRequests)) {
+  val request = fetchRequests.dequeue()
+  val remoteAddress = request.address
+  if (isRemoteAddressMaxedOut(remoteAddress, request)) {
+logDebug(s"Deferring fetch request for $remoteAddress with 
${request.blocks.size} blocks")
+val defReqQueue = deferredFetchRequests.getOrElse(remoteAddress, 
new Queue[FetchRequest]())
+defReqQueue.enqueue(request)
+deferredFetchRequests(remoteAddress) = defReqQueue
+  } else {
+send(remoteAddress, request)
+  }
+}
+
+def send(remoteAddress: BlockManagerId, request: FetchRequest): Unit = 
{
+  sendRequest(request)
+  numBlocksInFlightPerAddress(remoteAddress) =
+numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + 
request.blocks.size
+}
+
+def isRemoteBlockFetchable(fetchReqQueue: Queue[FetchRequest]): 
Boolean = {
+  fetchReqQueue.nonEmpty &&
+(bytesInFlight == 0 ||
+  (reqsInFlight + 1 <= maxReqsInFlight &&
+bytesInFlight + fetchReqQueue.front.size <= maxBytesInFlight))
+}
+
+// Checks if sending a new fetch request will exceed the max no. of 
blocks being fetched from a
+// given remote address.
+def isRemoteAddressMaxedOut(remoteHost: BlockManagerId, request: 
FetchRequest): Boolean = {
--- End diff --

Is this `remoteHost` or `remoteAddress`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-06 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r125912117
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,16 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
--- End diff --

I agree this won't resolve all problems, but it is still good to add the 
limit to prevent a single reducer from fetching too much blocks at a time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-06 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r125913198
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,16 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+  .doc("This configuration limits the number of remote blocks being 
fetched from a given " +
+" host port at any given point. When external shuffle is enabled 
and a large number of " +
+" blocks are being requested from a given node in a single fetch 
or simultaneously, this " +
+" could crash the Node Manager under increased load. You can 
mitigate this issue by " +
+" setting it to a lower value.")
+  .intConf
+  .createWithDefault(Int.MaxValue)
--- End diff --

nit: Should add checkValue to ensure this is above zero.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-06 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r125912856
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,16 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+  .doc("This configuration limits the number of remote blocks being 
fetched from a given " +
+" host port at any given point. When external shuffle is enabled 
and a large number of " +
--- End diff --

How does it affect the behavior whether the external shuffle service is 
enabled or not? AFAIK this should have little relation with external shuffle 
service.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-06 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r125921259
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -433,12 +449,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+while (isRemoteBlockFetchable(defReqQueue) &&
+!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
--- End diff --

If the request.blocks.size is above the config value, then 
`isRemoteAddressMaxedOut()` will always return false,  and thus we won't exit 
the while loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-06 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r125917832
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,16 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
--- End diff --

When shuffle service gets OOM, there are always lots of (thousands of) 
reducers(maybe from different apps) fetching blocks. I'm not sure if it will 
help much to limit in-flight blocks from reducer.

Also we've already have maxReqsInFlight and maxBytesInFlight. Is it little 
bit redundant to to have maxBlocksInFlightPerAddress?

Sorry for this comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r125888006
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,16 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
--- End diff --

I'm not sure if it's a good idea to do this at reducer side, because there 
may be a lot of reducers fetching data from one shuffle service at the same 
time, and you wouldn't know that at reducer side. cc @jinxing64 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-06-30 Thread dhruve
GitHub user dhruve opened a pull request:

https://github.com/apache/spark/pull/18487

[SPARK-21243][Core] Limit no. of map outputs in a shuffle fetch

## What changes were proposed in this pull request?
For configurations with external shuffle enabled, we have observed that if 
a very large no. of blocks are being fetched from a remote host, it puts the NM 
under extra pressure and can crash it. This change introduces a configuration 
`spark.reducer.maxBlocksInFlightPerAddress` , to limit the no. of map outputs 
being fetched from a given remote address. The changes applied here are 
applicable for both the scenarios - when external shuffle is enabled as well as 
disabled. 

## How was this patch tested?
Ran the job with the default configuration which does not change the 
existing behavior and ran it with few configurations of lower values 
-10,20,50,100. The job ran fine and there is no change in the output. (I will 
update the metrics related to NM in some time.)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhruve/spark impr/SPARK-21243

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18487


commit d60a0bef35e1c38108d3b18a40b7dc01ed8b814f
Author: Dhruve Ashar 
Date:   2017-06-30T12:58:35Z

[SPARK-21243][Core] Limit no. of map outputs in a shuffle fetch




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org