[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/14412


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r102621883
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,43 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param existingReplicas existing block managers that have a replica
+   * @param maxReplicas maximum replicas needed
+   * @return
+   */
+  def replicateBlock(
+blockId: BlockId,
+existingReplicas: Set[BlockManagerId],
+maxReplicas: Int): Unit = {
+logInfo(s"Pro-actively replicating $blockId")
+val infoForReplication = blockInfoManager.lockForReading(blockId).map 
{ info =>
--- End diff --

I think that we can set `spark.storage.exceptionOnPinLeak` to `true` in 
SparkConf to do this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-22 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r102620371
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,43 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param existingReplicas existing block managers that have a replica
+   * @param maxReplicas maximum replicas needed
+   * @return
+   */
+  def replicateBlock(
+blockId: BlockId,
+existingReplicas: Set[BlockManagerId],
+maxReplicas: Int): Unit = {
+logInfo(s"Pro-actively replicating $blockId")
+val infoForReplication = blockInfoManager.lockForReading(blockId).map 
{ info =>
--- End diff --

Nice catch! Can we also assert that all locks are released somewhere in 
`testProactiveReplication`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-17 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101854267
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -195,17 +198,39 @@ class BlockManagerMasterEndpoint(
 
 // Remove it from blockManagerInfo and remove all the blocks.
 blockManagerInfo.remove(blockManagerId)
+
 val iterator = info.blocks.keySet.iterator
 while (iterator.hasNext) {
   val blockId = iterator.next
   val locations = blockLocations.get(blockId)
   locations -= blockManagerId
   if (locations.size == 0) {
 blockLocations.remove(blockId)
+logWarning(s"No more replicas available for $blockId !")
+  } else if (proactivelyReplicate && (blockId.isRDD || 
blockId.isInstanceOf[TestBlockId])) {
+// only RDD blocks store data that users explicitly cache so we 
only need to proactively
+// replicate RDD blocks
+// broadcast related blocks exist on all executors, so we don't 
worry about them
+// we also need to replicate this behavior for test blocks for 
unit tests
+// we send a message to a randomly chosen executor location to 
replicate block
+// assuming single executor failure, we find out how many replicas 
existed before failure
+val maxReplicas = locations.size + 1
+
+val i = (new Random(blockId.hashCode)).nextInt(locations.size)
+val blockLocations = locations.toSeq
+val candidateBMId = blockLocations(i)
+val blockManager = blockManagerInfo.get(candidateBMId)
+if(blockManager.isDefined) {
+  val remainingLocations = locations.toSeq.filter(bm => bm != 
candidateBMId)
--- End diff --

If we are at this point, there would be atleast one location with the block 
which will get chosen as the candidate here. `remainingLocations` just tells 
the replication logic where other replicas are present (if any, so it can be an 
empty set), so it can use that info while choosing candidate executors for 
replication.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-17 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101851236
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -195,17 +198,39 @@ class BlockManagerMasterEndpoint(
 
 // Remove it from blockManagerInfo and remove all the blocks.
 blockManagerInfo.remove(blockManagerId)
+
 val iterator = info.blocks.keySet.iterator
 while (iterator.hasNext) {
   val blockId = iterator.next
   val locations = blockLocations.get(blockId)
   locations -= blockManagerId
   if (locations.size == 0) {
 blockLocations.remove(blockId)
+logWarning(s"No more replicas available for $blockId !")
+  } else if (proactivelyReplicate && (blockId.isRDD || 
blockId.isInstanceOf[TestBlockId])) {
+// only RDD blocks store data that users explicitly cache so we 
only need to proactively
+// replicate RDD blocks
+// broadcast related blocks exist on all executors, so we don't 
worry about them
+// we also need to replicate this behavior for test blocks for 
unit tests
+// we send a message to a randomly chosen executor location to 
replicate block
+// assuming single executor failure, we find out how many replicas 
existed before failure
+val maxReplicas = locations.size + 1
+
+val i = (new Random(blockId.hashCode)).nextInt(locations.size)
--- End diff --

Scala Random api doesn't have a choice method. And Spark Utils class has 
methods to shuffle, but not a random choice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-17 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101847693
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,43 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param existingReplicas existing block managers that have a replica
+   * @param maxReplicas maximum replicas needed
+   * @return
+   */
+  def replicateBlock(
+blockId: BlockId,
+existingReplicas: Set[BlockManagerId],
+maxReplicas: Int): Unit = {
+logInfo(s"Pro-actively replicating $blockId")
+val infoForReplication = blockInfoManager.lockForReading(blockId).map 
{ info =>
+  val data = doGetLocalBytes(blockId, info)
+  val storageLevel = StorageLevel(
+info.level.useDisk,
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-17 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101847672
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,43 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param existingReplicas existing block managers that have a replica
+   * @param maxReplicas maximum replicas needed
+   * @return
+   */
+  def replicateBlock(
+blockId: BlockId,
+existingReplicas: Set[BlockManagerId],
+maxReplicas: Int): Unit = {
+logInfo(s"Pro-actively replicating $blockId")
+val infoForReplication = blockInfoManager.lockForReading(blockId).map 
{ info =>
--- End diff --

Yes, that makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-17 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101843543
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -195,17 +198,39 @@ class BlockManagerMasterEndpoint(
 
 // Remove it from blockManagerInfo and remove all the blocks.
 blockManagerInfo.remove(blockManagerId)
+
 val iterator = info.blocks.keySet.iterator
 while (iterator.hasNext) {
   val blockId = iterator.next
   val locations = blockLocations.get(blockId)
   locations -= blockManagerId
   if (locations.size == 0) {
 blockLocations.remove(blockId)
+logWarning(s"No more replicas available for $blockId !")
+  } else if (proactivelyReplicate && (blockId.isRDD || 
blockId.isInstanceOf[TestBlockId])) {
+// only RDD blocks store data that users explicitly cache so we 
only need to proactively
--- End diff --

Agree with the suggestion. Modifying it. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-17 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101843045
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,47 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param replicas existing block managers that have a replica
+   * @param maxReps maximum replicas needed
+   * @return
+   */
+  def replicateBlock(blockId: BlockId, replicas: Set[BlockManagerId], 
maxReps: Int): Boolean = {
+logInfo(s"Pro-actively replicating $blockId")
+val infoForReplication = blockInfoManager.lockForReading(blockId).map 
{ info =>
+  val data = doGetLocalBytes(blockId, info)
+  val storageLevel = StorageLevel(
+info.level.useDisk,
+info.level.useMemory,
+info.level.useOffHeap,
+info.level.deserialized,
+maxReps)
+  (data, storageLevel, info.classTag)
+}
+infoForReplication.foreach { case (data, storageLevel, classTag) =>
+  replicate(blockId, data, storageLevel, classTag, replicas)
+}
+true
+  }
+
+  /**
* Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
+   * @param existingReplicas
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
--- End diff --

@sameeragarwal This code is being removed as a part of this PR. Code 
replacing this has this fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-15 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101447842
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -65,6 +66,8 @@ class BlockManagerMasterEndpoint(
 mapper
   }
 
+  val proactivelyReplicate = 
conf.get("spark.storage.replication.proactive", "false").toBoolean
--- End diff --

Please document this new configuration in `docs/configuration.md`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-15 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101447797
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -195,17 +198,39 @@ class BlockManagerMasterEndpoint(
 
 // Remove it from blockManagerInfo and remove all the blocks.
 blockManagerInfo.remove(blockManagerId)
+
 val iterator = info.blocks.keySet.iterator
 while (iterator.hasNext) {
   val blockId = iterator.next
   val locations = blockLocations.get(blockId)
   locations -= blockManagerId
   if (locations.size == 0) {
 blockLocations.remove(blockId)
+logWarning(s"No more replicas available for $blockId !")
+  } else if (proactivelyReplicate && (blockId.isRDD || 
blockId.isInstanceOf[TestBlockId])) {
+// only RDD blocks store data that users explicitly cache so we 
only need to proactively
+// replicate RDD blocks
+// broadcast related blocks exist on all executors, so we don't 
worry about them
+// we also need to replicate this behavior for test blocks for 
unit tests
+// we send a message to a randomly chosen executor location to 
replicate block
+// assuming single executor failure, we find out how many replicas 
existed before failure
+val maxReplicas = locations.size + 1
+
+val i = (new Random(blockId.hashCode)).nextInt(locations.size)
+val blockLocations = locations.toSeq
+val candidateBMId = blockLocations(i)
+val blockManager = blockManagerInfo.get(candidateBMId)
+if(blockManager.isDefined) {
+  val remainingLocations = locations.toSeq.filter(bm => bm != 
candidateBMId)
--- End diff --

Is it possible for this list to be empty in certain corner-cases? What 
happens if `ReplicateBlock` is called with an empty set of locations? Is it 
just a no-op in that case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-15 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101447703
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -195,17 +198,39 @@ class BlockManagerMasterEndpoint(
 
 // Remove it from blockManagerInfo and remove all the blocks.
 blockManagerInfo.remove(blockManagerId)
+
 val iterator = info.blocks.keySet.iterator
 while (iterator.hasNext) {
   val blockId = iterator.next
   val locations = blockLocations.get(blockId)
   locations -= blockManagerId
   if (locations.size == 0) {
 blockLocations.remove(blockId)
+logWarning(s"No more replicas available for $blockId !")
+  } else if (proactivelyReplicate && (blockId.isRDD || 
blockId.isInstanceOf[TestBlockId])) {
+// only RDD blocks store data that users explicitly cache so we 
only need to proactively
+// replicate RDD blocks
+// broadcast related blocks exist on all executors, so we don't 
worry about them
+// we also need to replicate this behavior for test blocks for 
unit tests
+// we send a message to a randomly chosen executor location to 
replicate block
+// assuming single executor failure, we find out how many replicas 
existed before failure
+val maxReplicas = locations.size + 1
+
+val i = (new Random(blockId.hashCode)).nextInt(locations.size)
+val blockLocations = locations.toSeq
+val candidateBMId = blockLocations(i)
+val blockManager = blockManagerInfo.get(candidateBMId)
+if(blockManager.isDefined) {
--- End diff --

If you're not going to have an `else` branch here then you might as well 
just `forEach` over the result of `blockManagerInfo.get(candidateBMId)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-15 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101446619
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,43 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param existingReplicas existing block managers that have a replica
+   * @param maxReplicas maximum replicas needed
+   * @return
+   */
+  def replicateBlock(
+blockId: BlockId,
--- End diff --

Same as Sameer's comment elsewhere in the code, we should fix the 
indentation here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-15 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101447299
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,43 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param existingReplicas existing block managers that have a replica
+   * @param maxReplicas maximum replicas needed
+   * @return
+   */
+  def replicateBlock(
+blockId: BlockId,
+existingReplicas: Set[BlockManagerId],
+maxReplicas: Int): Unit = {
+logInfo(s"Pro-actively replicating $blockId")
+val infoForReplication = blockInfoManager.lockForReading(blockId).map 
{ info =>
--- End diff --

This call acquires a read lock on the block, but when is that lock 
released? Per the Scaladoc of `doGetLocalBytes`, you need to be holding a read 
lock before calling that method, but upon successful return from that method 
the read lock will still be held by the caller.

I think what you want to do is acquire the lock, immediately call 
`doGetLocalBytes`, then begin a `try-finally` statement to call `replicate()` 
and unlock / release the lock in the `finally` block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-15 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101447394
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,43 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param existingReplicas existing block managers that have a replica
+   * @param maxReplicas maximum replicas needed
+   * @return
+   */
+  def replicateBlock(
+blockId: BlockId,
+existingReplicas: Set[BlockManagerId],
+maxReplicas: Int): Unit = {
+logInfo(s"Pro-actively replicating $blockId")
+val infoForReplication = blockInfoManager.lockForReading(blockId).map 
{ info =>
--- End diff --

Also, I don't think there's a need to have separate `.map` and `.foreach` 
calls over the option. Instead, I think it would be clearer to avoid the 
assignment to the `infoForReplication` variable and just perform all of the 
work inside of a `.foreach` call on the `Option` with the block info.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-15 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101446632
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,43 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param existingReplicas existing block managers that have a replica
+   * @param maxReplicas maximum replicas needed
+   * @return
--- End diff --

You can omit this `@return` since this method doesn't have a return value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-15 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101446759
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -195,17 +198,39 @@ class BlockManagerMasterEndpoint(
 
 // Remove it from blockManagerInfo and remove all the blocks.
 blockManagerInfo.remove(blockManagerId)
+
 val iterator = info.blocks.keySet.iterator
 while (iterator.hasNext) {
   val blockId = iterator.next
   val locations = blockLocations.get(blockId)
   locations -= blockManagerId
   if (locations.size == 0) {
 blockLocations.remove(blockId)
+logWarning(s"No more replicas available for $blockId !")
+  } else if (proactivelyReplicate && (blockId.isRDD || 
blockId.isInstanceOf[TestBlockId])) {
+// only RDD blocks store data that users explicitly cache so we 
only need to proactively
+// replicate RDD blocks
+// broadcast related blocks exist on all executors, so we don't 
worry about them
+// we also need to replicate this behavior for test blocks for 
unit tests
+// we send a message to a randomly chosen executor location to 
replicate block
+// assuming single executor failure, we find out how many replicas 
existed before failure
+val maxReplicas = locations.size + 1
+
+val i = (new Random(blockId.hashCode)).nextInt(locations.size)
+val blockLocations = locations.toSeq
+val candidateBMId = blockLocations(i)
+val blockManager = blockManagerInfo.get(candidateBMId)
+if(blockManager.isDefined) {
--- End diff --

Nit: space after `if`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-15 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101447547
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -195,17 +198,39 @@ class BlockManagerMasterEndpoint(
 
 // Remove it from blockManagerInfo and remove all the blocks.
 blockManagerInfo.remove(blockManagerId)
+
 val iterator = info.blocks.keySet.iterator
 while (iterator.hasNext) {
   val blockId = iterator.next
   val locations = blockLocations.get(blockId)
   locations -= blockManagerId
   if (locations.size == 0) {
 blockLocations.remove(blockId)
+logWarning(s"No more replicas available for $blockId !")
+  } else if (proactivelyReplicate && (blockId.isRDD || 
blockId.isInstanceOf[TestBlockId])) {
+// only RDD blocks store data that users explicitly cache so we 
only need to proactively
--- End diff --

+1 on Sameer's suggestions. This code is a little subtle and benefits from 
a clearer comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-15 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101447632
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -195,17 +198,39 @@ class BlockManagerMasterEndpoint(
 
 // Remove it from blockManagerInfo and remove all the blocks.
 blockManagerInfo.remove(blockManagerId)
+
 val iterator = info.blocks.keySet.iterator
 while (iterator.hasNext) {
   val blockId = iterator.next
   val locations = blockLocations.get(blockId)
   locations -= blockManagerId
   if (locations.size == 0) {
 blockLocations.remove(blockId)
+logWarning(s"No more replicas available for $blockId !")
+  } else if (proactivelyReplicate && (blockId.isRDD || 
blockId.isInstanceOf[TestBlockId])) {
+// only RDD blocks store data that users explicitly cache so we 
only need to proactively
+// replicate RDD blocks
+// broadcast related blocks exist on all executors, so we don't 
worry about them
+// we also need to replicate this behavior for test blocks for 
unit tests
+// we send a message to a randomly chosen executor location to 
replicate block
+// assuming single executor failure, we find out how many replicas 
existed before failure
+val maxReplicas = locations.size + 1
+
+val i = (new Random(blockId.hashCode)).nextInt(locations.size)
--- End diff --

Why do we need to use a fixed random seed here? Testing?

Also, isn't there a `Random.choice()` that you can use for this? Or a 
method like that in our own `Utils` class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-15 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101446961
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,43 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param existingReplicas existing block managers that have a replica
+   * @param maxReplicas maximum replicas needed
+   * @return
+   */
+  def replicateBlock(
+blockId: BlockId,
+existingReplicas: Set[BlockManagerId],
+maxReplicas: Int): Unit = {
+logInfo(s"Pro-actively replicating $blockId")
+val infoForReplication = blockInfoManager.lockForReading(blockId).map 
{ info =>
+  val data = doGetLocalBytes(blockId, info)
+  val storageLevel = StorageLevel(
+info.level.useDisk,
--- End diff --

Minor nit, but a problem with the `StorageLevel` constructor is that it has 
a bunch of adjacent boolean parameters, so in such cases I'd usually prefer to 
name all of the parameters explicitly at the call site in order to avoid errors 
should these lines get permuted / to convince readers that the API is being 
used correctly.

Thus I'd probably write each line like `useDisk = info.level.useDisk,` etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-15 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101432219
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -195,17 +198,39 @@ class BlockManagerMasterEndpoint(
 
 // Remove it from blockManagerInfo and remove all the blocks.
 blockManagerInfo.remove(blockManagerId)
+
 val iterator = info.blocks.keySet.iterator
 while (iterator.hasNext) {
   val blockId = iterator.next
   val locations = blockLocations.get(blockId)
   locations -= blockManagerId
   if (locations.size == 0) {
 blockLocations.remove(blockId)
+logWarning(s"No more replicas available for $blockId !")
+  } else if (proactivelyReplicate && (blockId.isRDD || 
blockId.isInstanceOf[TestBlockId])) {
+// only RDD blocks store data that users explicitly cache so we 
only need to proactively
--- End diff --

Just a nit, but how about we split this comment into two separate ones for 
better readability around the two separate set of issues. My suggestion:

```scala
  // De-register the block if none of the block managers have it. 
Otherwise, if pro-active
  // replication is enabled, and a block is either an RDD or a test 
block (the latter is used
  // for unit testing), we send a message to a randomly chosen executor 
location to replicate
  // the given block. Note that we ignore other block types (such as 
broadcast/shuffle blocks
  // etc.) as replication doesn't make much sense in that context.
  if (locations.size == 0) {
blockLocations.remove(blockId)
  } else if (proactivelyReplicate && (blockId.isRDD || 
blockId.isInstanceOf[TestBlockId])) {
  // As a heursitic, assume single executor failure to find out the 
number of replicas that
  // existed before failure
  val maxReplicas = locations.size + 1

  val i = (new Random(blockId.hashCode)).nextInt(locations.size)
  val blockLocations = locations.toSeq
  val candidateBMId = blockLocations(i)
  val blockManager = blockManagerInfo.get(candidateBMId)
  if(blockManager.isDefined) {
val remainingLocations = locations.toSeq.filter(bm => bm != 
candidateBMId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, 
maxReplicas)
blockManager.get.slaveEndpoint.ask[Boolean](replicateMsg)
  }
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-15 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r101430017
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,47 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param replicas existing block managers that have a replica
+   * @param maxReps maximum replicas needed
+   * @return
+   */
+  def replicateBlock(blockId: BlockId, replicas: Set[BlockManagerId], 
maxReps: Int): Boolean = {
+logInfo(s"Pro-actively replicating $blockId")
+val infoForReplication = blockInfoManager.lockForReading(blockId).map 
{ info =>
+  val data = doGetLocalBytes(blockId, info)
+  val storageLevel = StorageLevel(
+info.level.useDisk,
+info.level.useMemory,
+info.level.useOffHeap,
+info.level.deserialized,
+maxReps)
+  (data, storageLevel, info.classTag)
+}
+infoForReplication.foreach { case (data, storageLevel, classTag) =>
+  replicate(blockId, data, storageLevel, classTag, replicas)
+}
+true
+  }
+
+  /**
* Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
+   * @param existingReplicas
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
--- End diff --

nit: this still needs fixing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-02 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r99237148
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -188,24 +189,45 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeBlockManager(blockManagerId: BlockManagerId) {
+val proactivelyReplicate = 
conf.get("spark.storage.replication.proactive", "false").toBoolean
+
 val info = blockManagerInfo(blockManagerId)
 
 // Remove the block manager from blockManagerIdByExecutor.
 blockManagerIdByExecutor -= blockManagerId.executorId
 
-// Remove it from blockManagerInfo and remove all the blocks.
-blockManagerInfo.remove(blockManagerId)
 val iterator = info.blocks.keySet.iterator
 while (iterator.hasNext) {
   val blockId = iterator.next
   val locations = blockLocations.get(blockId)
   locations -= blockManagerId
   if (locations.size == 0) {
 blockLocations.remove(blockId)
+logWarning(s"No more replicas available for $blockId !")
+  } else if ((blockId.isRDD || blockId.isInstanceOf[TestBlockId]) && 
proactivelyReplicate) {
+// we only need to proactively replicate RDD blocks
+// we also need to replicate this behavior for test blocks for 
unit tests
+// we send a message to a randomly chosen executor location to 
replicate block
+// assuming single executor failure, we find out how many replicas 
existed before failure
+val maxReplicas = locations.size + 1
--- End diff --

Yes, that's a tough one. So the way replication is implemented, the correct 
storage level is only available with one of the blocks at BlockManager layer 
(we don't have access to RDD that this block is a part of, so we can't extract 
information from there). The remaining blocks all have storage levels set to 1. 
So I use the locations size to get an approximation for the storage level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-02 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r99190414
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -188,24 +189,45 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeBlockManager(blockManagerId: BlockManagerId) {
+val proactivelyReplicate = 
conf.get("spark.storage.replication.proactive", "false").toBoolean
--- End diff --

Removed from the function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-02 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r99189219
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1152,20 +1185,25 @@ private[spark] class BlockManager(
 
 val startTime = System.nanoTime
 
-var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId]
+var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
 var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
 var numFailures = 0
 
+val initialPeers = {
+  val peers = getPeers(false)
+  if(existingReplicas.isEmpty) peers else 
peers.filter(!existingReplicas.contains(_))
+}
+
 var peersForReplication = blockReplicationPolicy.prioritize(
   blockManagerId,
-  getPeers(false),
-  mutable.HashSet.empty,
+  initialPeers,
+  peersReplicatedTo,
   blockId,
   numPeersToReplicateTo)
 
 while(numFailures <= maxReplicationFailures &&
-!peersForReplication.isEmpty &&
-peersReplicatedTo.size != numPeersToReplicateTo) {
+  !peersForReplication.isEmpty &&
+  peersReplicatedTo.size < numPeersToReplicateTo) {
--- End diff --

One scenario I can think of is if an executor with the block being 
replicated is lost (due to say a delayed heartbeat) and joins back again. The 
current implementation would recognize the block manager needs to reregister 
and will report all blocks. The probability of this happening increases with 
pro-active replication, I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-02 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r99185105
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1152,20 +1185,25 @@ private[spark] class BlockManager(
 
 val startTime = System.nanoTime
 
-var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId]
+var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
 var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
 var numFailures = 0
 
+val initialPeers = {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-02 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r99174290
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,47 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param replicas existing block managers that have a replica
+   * @param maxReps maximum replicas needed
+   * @return
+   */
+  def replicateBlock(blockId: BlockId, replicas: Set[BlockManagerId], 
maxReps: Int): Boolean = {
--- End diff --

This doesn't need to return a boolean. Changing the return type to Unit. 
Also changing the variable names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-02-02 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r99174354
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,47 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param replicas existing block managers that have a replica
+   * @param maxReps maximum replicas needed
+   * @return
+   */
+  def replicateBlock(blockId: BlockId, replicas: Set[BlockManagerId], 
maxReps: Int): Boolean = {
+logInfo(s"Pro-actively replicating $blockId")
+val infoForReplication = blockInfoManager.lockForReading(blockId).map 
{ info =>
+  val data = doGetLocalBytes(blockId, info)
+  val storageLevel = StorageLevel(
+info.level.useDisk,
+info.level.useMemory,
+info.level.useOffHeap,
+info.level.deserialized,
+maxReps)
+  (data, storageLevel, info.classTag)
+}
+infoForReplication.foreach { case (data, storageLevel, classTag) =>
+  replicate(blockId, data, storageLevel, classTag, replicas)
+}
+true
+  }
+
+  /**
* Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
+   * @param existingReplicas
--- End diff --

Removing these.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-01-30 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r98604570
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -188,24 +189,45 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeBlockManager(blockManagerId: BlockManagerId) {
+val proactivelyReplicate = 
conf.get("spark.storage.replication.proactive", "false").toBoolean
+
 val info = blockManagerInfo(blockManagerId)
 
 // Remove the block manager from blockManagerIdByExecutor.
 blockManagerIdByExecutor -= blockManagerId.executorId
 
-// Remove it from blockManagerInfo and remove all the blocks.
-blockManagerInfo.remove(blockManagerId)
 val iterator = info.blocks.keySet.iterator
 while (iterator.hasNext) {
   val blockId = iterator.next
   val locations = blockLocations.get(blockId)
   locations -= blockManagerId
   if (locations.size == 0) {
 blockLocations.remove(blockId)
+logWarning(s"No more replicas available for $blockId !")
+  } else if ((blockId.isRDD || blockId.isInstanceOf[TestBlockId]) && 
proactivelyReplicate) {
+// we only need to proactively replicate RDD blocks
--- End diff --

Also, might be nicer to make `proactivelyReplicate` the first check for 
better short-circuiting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-01-30 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r98603116
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,47 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param replicas existing block managers that have a replica
+   * @param maxReps maximum replicas needed
+   * @return
+   */
+  def replicateBlock(blockId: BlockId, replicas: Set[BlockManagerId], 
maxReps: Int): Boolean = {
+logInfo(s"Pro-actively replicating $blockId")
+val infoForReplication = blockInfoManager.lockForReading(blockId).map 
{ info =>
+  val data = doGetLocalBytes(blockId, info)
+  val storageLevel = StorageLevel(
+info.level.useDisk,
+info.level.useMemory,
+info.level.useOffHeap,
+info.level.deserialized,
+maxReps)
+  (data, storageLevel, info.classTag)
+}
+infoForReplication.foreach { case (data, storageLevel, classTag) =>
+  replicate(blockId, data, storageLevel, classTag, replicas)
+}
+true
+  }
+
+  /**
* Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
+   * @param existingReplicas
--- End diff --

Let's either document these params or just remove them


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-01-30 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r98604381
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -188,24 +189,45 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeBlockManager(blockManagerId: BlockManagerId) {
+val proactivelyReplicate = 
conf.get("spark.storage.replication.proactive", "false").toBoolean
+
 val info = blockManagerInfo(blockManagerId)
 
 // Remove the block manager from blockManagerIdByExecutor.
 blockManagerIdByExecutor -= blockManagerId.executorId
 
-// Remove it from blockManagerInfo and remove all the blocks.
-blockManagerInfo.remove(blockManagerId)
 val iterator = info.blocks.keySet.iterator
 while (iterator.hasNext) {
   val blockId = iterator.next
   val locations = blockLocations.get(blockId)
   locations -= blockManagerId
   if (locations.size == 0) {
 blockLocations.remove(blockId)
+logWarning(s"No more replicas available for $blockId !")
+  } else if ((blockId.isRDD || blockId.isInstanceOf[TestBlockId]) && 
proactivelyReplicate) {
+// we only need to proactively replicate RDD blocks
--- End diff --

I think it makes sense overall but it'd be great to add some more comments 
about why are we only concerned with RDD blocks and not others. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-01-30 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r98603889
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -188,24 +189,45 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeBlockManager(blockManagerId: BlockManagerId) {
+val proactivelyReplicate = 
conf.get("spark.storage.replication.proactive", "false").toBoolean
--- End diff --

can probably move this out of the function block


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-01-30 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r98603866
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1152,20 +1185,25 @@ private[spark] class BlockManager(
 
 val startTime = System.nanoTime
 
-var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId]
+var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
 var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
 var numFailures = 0
 
+val initialPeers = {
+  val peers = getPeers(false)
+  if(existingReplicas.isEmpty) peers else 
peers.filter(!existingReplicas.contains(_))
+}
+
 var peersForReplication = blockReplicationPolicy.prioritize(
   blockManagerId,
-  getPeers(false),
-  mutable.HashSet.empty,
+  initialPeers,
+  peersReplicatedTo,
   blockId,
   numPeersToReplicateTo)
 
 while(numFailures <= maxReplicationFailures &&
-!peersForReplication.isEmpty &&
-peersReplicatedTo.size != numPeersToReplicateTo) {
+  !peersForReplication.isEmpty &&
+  peersReplicatedTo.size < numPeersToReplicateTo) {
--- End diff --

While I think it's still valid to replace the inequality with a 
strictly-less-than check, but just out of curiosity, can the number of 
`peersReplicatedTo` ever exceed `numPeersToReplicateTo`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-01-30 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r98605803
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -188,24 +189,45 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeBlockManager(blockManagerId: BlockManagerId) {
+val proactivelyReplicate = 
conf.get("spark.storage.replication.proactive", "false").toBoolean
+
 val info = blockManagerInfo(blockManagerId)
 
 // Remove the block manager from blockManagerIdByExecutor.
 blockManagerIdByExecutor -= blockManagerId.executorId
 
-// Remove it from blockManagerInfo and remove all the blocks.
-blockManagerInfo.remove(blockManagerId)
 val iterator = info.blocks.keySet.iterator
 while (iterator.hasNext) {
   val blockId = iterator.next
   val locations = blockLocations.get(blockId)
   locations -= blockManagerId
   if (locations.size == 0) {
 blockLocations.remove(blockId)
+logWarning(s"No more replicas available for $blockId !")
+  } else if ((blockId.isRDD || blockId.isInstanceOf[TestBlockId]) && 
proactivelyReplicate) {
+// we only need to proactively replicate RDD blocks
+// we also need to replicate this behavior for test blocks for 
unit tests
+// we send a message to a randomly chosen executor location to 
replicate block
+// assuming single executor failure, we find out how many replicas 
existed before failure
+val maxReplicas = locations.size + 1
--- End diff --

What happens if multiple executors are removed simultaneously? Depending on 
the invocation sequence, is it possible for `maxReplicas` to be significantly 
less than the original number of replicas?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-01-30 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r98603462
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1152,20 +1185,25 @@ private[spark] class BlockManager(
 
 val startTime = System.nanoTime
 
-var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId]
+var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
 var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
 var numFailures = 0
 
+val initialPeers = {
--- End diff --

can this not be just:

```scala
val initialPeers = getPeers(false).filter(!existingReplicas.contains(_))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-01-30 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r98603908
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -188,24 +189,45 @@ class BlockManagerMasterEndpoint(
   }
 
   private def removeBlockManager(blockManagerId: BlockManagerId) {
+val proactivelyReplicate = 
conf.get("spark.storage.replication.proactive", "false").toBoolean
+
 val info = blockManagerInfo(blockManagerId)
 
 // Remove the block manager from blockManagerIdByExecutor.
 blockManagerIdByExecutor -= blockManagerId.executorId
 
-// Remove it from blockManagerInfo and remove all the blocks.
-blockManagerInfo.remove(blockManagerId)
--- End diff --

why did you move this at the end?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-01-30 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r98602812
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,47 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param replicas existing block managers that have a replica
+   * @param maxReps maximum replicas needed
+   * @return
+   */
+  def replicateBlock(blockId: BlockId, replicas: Set[BlockManagerId], 
maxReps: Int): Boolean = {
--- End diff --

How about something like this for better readability?

```scala
def replicateBlock(blockId: BlockId, existingReplicas: Set[BlockManagerId], 
maxReplicas: Int)
```

Also, is there a reason this returns a `Boolean`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14412: [SPARK-15355] [CORE] Proactive block replication

2017-01-30 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/14412#discussion_r98603310
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1131,14 +1131,47 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   *
+   * @param blockId blockId being replicate
+   * @param replicas existing block managers that have a replica
+   * @param maxReps maximum replicas needed
+   * @return
+   */
+  def replicateBlock(blockId: BlockId, replicas: Set[BlockManagerId], 
maxReps: Int): Boolean = {
+logInfo(s"Pro-actively replicating $blockId")
+val infoForReplication = blockInfoManager.lockForReading(blockId).map 
{ info =>
+  val data = doGetLocalBytes(blockId, info)
+  val storageLevel = StorageLevel(
+info.level.useDisk,
+info.level.useMemory,
+info.level.useOffHeap,
+info.level.deserialized,
+maxReps)
+  (data, storageLevel, info.classTag)
+}
+infoForReplication.foreach { case (data, storageLevel, classTag) =>
+  replicate(blockId, data, storageLevel, classTag, replicas)
+}
+true
+  }
+
+  /**
* Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
+   * @param existingReplicas
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
--- End diff --

nit: 4 spaces 
(https://github.com/databricks/scala-style-guide/#spacing-and-indentation)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org