[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18200248
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 
* 1000) // milliseconds
--- End diff --

I didnt want to pollute the namespace inside the BlockManager class any 
more than absolutely necessary. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18200286
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -111,6 +112,9 @@ private[spark] class BlockManager(
 MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, 
conf)
   private val broadcastCleaner = new MetadataCleaner(
 MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
+  @volatile private var cachedPeers: Seq[BlockManagerId] = _
--- End diff --

Good idea. Should have done that myself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18200328
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+  if (cachedPeers == null || forceFetch || timeout) {
+cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+  cachedPeers
+}
+  }
+
+  /**
* Replicate block to another node.
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18201177
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+  if (cachedPeers == null || forceFetch || timeout) {
+cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+  cachedPeers
+}
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersForReplication = new ArrayBuffer[BlockManagerId]
--- End diff --

Why 3? This will be as large as `cluster size - 1`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18201208
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+  if (cachedPeers == null || forceFetch || timeout) {
+cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+  cachedPeers
+}
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersForReplication = new ArrayBuffer[BlockManagerId]
--- End diff --

nvm ignore that comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18201230
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+  if (cachedPeers == null || forceFetch || timeout) {
+cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+  cachedPeers
+}
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersForReplication = new ArrayBuffer[BlockManagerId]
+val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
+val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var replicationFailed = false
+var failures = 0
+var done = false
+
+// Get cached list of peers
+peersForReplication ++= getPeers(forceFetch = false)
+
+// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
+// So assuming the list of peers does not change and no replication 
failures,
+// if there are multiple attempts in the same node to replicate the 
same block,
+// the same set of peers will be selected.
+def getRandomPeer(): Option[BlockManagerId] = {
+  // If replication had failed, then force update the cached list of 
peers and remove the peers
+  // that have been already used
+  if (replicationFailed) {
+peersForReplication.clear()
+peersForReplication ++= getPeers(forceFetch = true)
+peersForReplication --= peersReplicatedTo
+peersForReplication --= peersFailedToReplicateTo
+  }
+  if (!peersForReplication.isEmpty) {
+Some(peersForReplication(random.nextInt(peersForReplication.size)))
+  } else {
+None
+  }
 }
-for (peer: BlockManagerId <- cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(s"Try to replicate $blockId once; The size of the data is 
${data.limit()} Bytes. " +
-s"To node: $peer")
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =>
-  logError(s"Failed to replicate block to $peer", e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+//
+// This selection of a peer and replication is continued in a loop 
until one of the
+// following 3 conditions is fulfilled:
+// (i) specified number of peers have been replicated to
+// (ii) too many failures in replicating to peers
+// (iii) no peer left to replicate to
+//
+while (!done) {
+  getRandomPeer() match {
+case Some(peer) =>
+  try {
+val onePeerStartTime = System.currentTimeMillis
+data.rewind()
+logTrace(s"Trying to replicate $blockId of ${data.limit()} 
bytes to $peer")
+blockTransferService.uploadBlockSync(
+  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
+logTrace(s"Replicated $blockId of ${data.limit()} bytes to 
$peer in %f ms"
+  .format((System.currentTimeMillis - onePeerStartTime) / 

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18201286
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+  if (cachedPeers == null || forceFetch || timeout) {
+cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+  cachedPeers
+}
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersForReplication = new ArrayBuffer[BlockManagerId]
+val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
+val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var replicationFailed = false
+var failures = 0
+var done = false
+
+// Get cached list of peers
+peersForReplication ++= getPeers(forceFetch = false)
+
+// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
+// So assuming the list of peers does not change and no replication 
failures,
+// if there are multiple attempts in the same node to replicate the 
same block,
+// the same set of peers will be selected.
+def getRandomPeer(): Option[BlockManagerId] = {
+  // If replication had failed, then force update the cached list of 
peers and remove the peers
+  // that have been already used
+  if (replicationFailed) {
+peersForReplication.clear()
+peersForReplication ++= getPeers(forceFetch = true)
+peersForReplication --= peersReplicatedTo
+peersForReplication --= peersFailedToReplicateTo
+  }
+  if (!peersForReplication.isEmpty) {
+Some(peersForReplication(random.nextInt(peersForReplication.size)))
+  } else {
+None
+  }
 }
-for (peer: BlockManagerId <- cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(s"Try to replicate $blockId once; The size of the data is 
${data.limit()} Bytes. " +
-s"To node: $peer")
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =>
-  logError(s"Failed to replicate block to $peer", e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+//
+// This selection of a peer and replication is continued in a loop 
until one of the
+// following 3 conditions is fulfilled:
+// (i) specified number of peers have been replicated to
+// (ii) too many failures in replicating to peers
+// (iii) no peer left to replicate to
+//
+while (!done) {
+  getRandomPeer() match {
+case Some(peer) =>
+  try {
+val onePeerStartTime = System.currentTimeMillis
+data.rewind()
+logTrace(s"Trying to replicate $blockId of ${data.limit()} 
bytes to $peer")
+blockTransferService.uploadBlockSync(
+  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
+logTrace(s"Replicated $blockId of ${data.limit()} bytes to 
$peer in %f ms"
+  .format((System.currentTimeMillis - onePeerStartTime) / 

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18233118
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala 
---
@@ -59,6 +59,8 @@ class BlockManagerId private (
 
   def port: Int = port_
 
+  def isDriver = (executorId == "")
--- End diff --

Right!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18233139
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
--- End diff --

Sorry :P


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57351691
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21043/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57352608
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21044/consoleFull)
 for   PR 2366 at commit 
[`0661773`](https://github.com/apache/spark/commit/06617739863b3c79a7be91e83bf382336b03083e).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57363077
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21044/consoleFull)
 for   PR 2366 at commit 
[`0661773`](https://github.com/apache/spark/commit/06617739863b3c79a7be91e83bf382336b03083e).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57363086
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21044/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57402676
  
@rxin This is good to go!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-30 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18259626
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1228,4 +1244,314 @@ class BlockManagerSuite extends FunSuite with 
Matchers with BeforeAndAfter
 assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
 assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
   }
+
+  test("get peers with addition and removal of block managers") {
--- End diff --

should we create a new test suite BlockManagerReplicationSuite? This file 
is getting long.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-10-02 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57676144
  
Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-10-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57676219
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21214/consoleFull)
 for   PR 2366 at commit 
[`9690f57`](https://github.com/apache/spark/commit/9690f57be441ea15b3c9de040d6ba07bec262e22).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-10-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57678711
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21215/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-10-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57692138
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21214/consoleFull)
 for   PR 2366 at commit 
[`9690f57`](https://github.com/apache/spark/commit/9690f57be441ea15b3c9de040d6ba07bec262e22).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class GetPeers(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-10-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57692143
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21214/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-10-02 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57703944
  
Thanks. Merging in master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-10-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/2366


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-11 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/2366

[SPARK-3495] Block replication fails continuously when the replication 
target node is dead

If a block manager (say, A) wants to replicate a block and the node chosen 
for replication (say, B) is dead, then the attempt to send the block to B 
fails. However, this continues to fail indefinitely. Even if the driver learns 
about the demise of the B, A continues to try replicating to B and failing 
miserably.

The reason behind this bug is that A initially fetches a list of peers from 
the driver (when B was active), but never updates it after B is dead. This 
affects Spark Streaming as its receiver uses block replication.

The solution in this patch adds the following.
- Changed BlockManagerMaster to return all the peers of a block manager, 
rather than the requested number.
- Refactored BlockManager's replication code to handle peer caching 
correctly.
+ The peer for replication is randomly selected. This is different from 
past behavior where for a node A, a node B was deterministically chosen for the 
lifetime of the application.
+ If replication fails to one node, the peers are refetched.
+ The peer cached has a TTL of 1 second to enable discovery of new 
peers and using them for replication.
- Added replication unit tests (replication was not tested till now, duh!)

This should not make a difference in performance of Spark workloads where 
replication is not used.

@andrewor14 @JoshRosen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark replication-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2366.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2366


commit af0c1daea8a22bca3b7826322205c887370ce247
Author: Tathagata Das 
Date:   2014-09-11T02:54:31Z

Added replication unit tests to BlockManagerSuite

commit 9f0ac9fb20660ff183490d13f4e3195b9283bc61
Author: Tathagata Das 
Date:   2014-09-11T08:44:18Z

Modified replication tests to fail on replication bug.

commit d081bf60e87689994a006603f84cb8f22ab19c6a
Author: Tathagata Das 
Date:   2014-09-11T20:58:14Z

Fixed bug in get peers and unit tests to test get-peers and replication 
under executor churn.

commit 03de02d532f51b23bc1b79fc76115aacbd64a4b1
Author: Tathagata Das 
Date:   2014-09-12T00:46:02Z

Change replication logic to correctly refetch peers from master on failure 
and on new worker addition.

commit 7598f913c52728f25b6bce91dd9ae6879105e261
Author: Tathagata Das 
Date:   2014-09-12T00:52:16Z

Minor changes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-11 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-55351240
  
test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-55351493
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20190/consoleFull)
 for   PR 2366 at commit 
[`7598f91`](https://github.com/apache/spark/commit/7598f913c52728f25b6bce91dd9ae6879105e261).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-55352316
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20193/consoleFull)
 for   PR 2366 at commit 
[`4a20531`](https://github.com/apache/spark/commit/4a205314cf1fc9a7a413bc1fb066fe2bb2c21932).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17460098
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,87 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) 
// milliseconds
--- End diff --

Is 1000 is good enough?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-55354916
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20190/consoleFull)
 for   PR 2366 at commit 
[`7598f91`](https://github.com/apache/spark/commit/7598f913c52728f25b6bce91dd9ae6879105e261).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class GetPeers(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-11 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-55355335
  
GetPeers class signatures has been changed from `case class 
GetPeers(blockManagerId: BlockManagerId, numPeers: Int)` to `case class 
GetPeers(blockManagerId: BlockManagerId)` . This is probably okay as this is 
developer API. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-55355811
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20193/consoleFull)
 for   PR 2366 at commit 
[`4a20531`](https://github.com/apache/spark/commit/4a205314cf1fc9a7a413bc1fb066fe2bb2c21932).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class GetPeers(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-55372429
  
Isn't 1s cache span too low? How often will we get a cache hit if they 
expire in 1 sec?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17493889
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala 
---
@@ -59,6 +59,8 @@ class BlockManagerId private (
 
   def port: Int = port_
 
+  def isDriver = (executorId == "")
--- End diff --

+1 on this. I added this TODO a while back and I think this also affects 
some UI code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-55447441
  
Yeah, 1 second is probably too low (though there will be 4/5 cache hits for 
every miss for streaming). Better to have it be a minute. Its a tradeoff 
between how fast we want streaming to find new nodes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17497968
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala ---
@@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, 
conf: SparkConf, listenerBus
 blockIds.map(blockId => getLocations(blockId))
   }
 
-  private def getPeers(blockManagerId: BlockManagerId, size: Int): 
Seq[BlockManagerId] = {
-val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-
-val selfIndex = peers.indexOf(blockManagerId)
+  /** Get the list of the peers of the given block manager */
+  private def getPeers(blockManagerId: BlockManagerId): 
Seq[BlockManagerId] = {
+val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver 
}.toArray
+val selfIndex = blockManagerIds.indexOf(blockManagerId)
--- End diff --

Why not just do `contains`? Then we don't need to convert this to an array 
and use indexof


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17498010
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala ---
@@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, 
conf: SparkConf, listenerBus
 blockIds.map(blockId => getLocations(blockId))
   }
 
-  private def getPeers(blockManagerId: BlockManagerId, size: Int): 
Seq[BlockManagerId] = {
-val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-
-val selfIndex = peers.indexOf(blockManagerId)
+  /** Get the list of the peers of the given block manager */
+  private def getPeers(blockManagerId: BlockManagerId): 
Seq[BlockManagerId] = {
+val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver 
}.toArray
+val selfIndex = blockManagerIds.indexOf(blockManagerId)
 if (selfIndex == -1) {
-  throw new SparkException("Self index for " + blockManagerId + " not 
found")
+  logError("Self index for " + blockManagerId + " not found")
+  Seq.empty
+} else {
+  // If the blockManagerIds is [ id1 id2 id3 id4 id5 ] and the 
blockManagerId is id2
+  // Then this code will return the list [ id3 id4 id5 id1 ]
+  Array.tabulate[BlockManagerId](blockManagerIds.size - 1) { i =>
+blockManagerIds((selfIndex + i + 1) % blockManagerIds.size)
--- End diff --

Here we can just subtract it from the set instead of doing this complicated 
logic, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17498022
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala ---
@@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, 
conf: SparkConf, listenerBus
 blockIds.map(blockId => getLocations(blockId))
   }
 
-  private def getPeers(blockManagerId: BlockManagerId, size: Int): 
Seq[BlockManagerId] = {
-val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-
-val selfIndex = peers.indexOf(blockManagerId)
+  /** Get the list of the peers of the given block manager */
--- End diff --

Maybe add a comment to explain that this excludes self


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17498056
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala ---
@@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, 
conf: SparkConf, listenerBus
 blockIds.map(blockId => getLocations(blockId))
   }
 
-  private def getPeers(blockManagerId: BlockManagerId, size: Int): 
Seq[BlockManagerId] = {
-val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-
-val selfIndex = peers.indexOf(blockManagerId)
+  /** Get the list of the peers of the given block manager */
+  private def getPeers(blockManagerId: BlockManagerId): 
Seq[BlockManagerId] = {
+val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver 
}.toArray
+val selfIndex = blockManagerIds.indexOf(blockManagerId)
--- End diff --

Yeah, had just kept the existing code here as is. Will change, obviously 
better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17498083
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -62,6 +63,7 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   val securityMgr = new SecurityManager(conf)
   val mapOutputTracker = new MapOutputTrackerMaster(conf)
   val shuffleManager = new HashShuffleManager(conf)
+  val allStores = new ArrayBuffer[BlockManager]
--- End diff --

Can you add a comment to explain the reason why we need this is so we can 
ensure all prior block managers are stopped before starting each test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17498146
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala ---
@@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, 
conf: SparkConf, listenerBus
 blockIds.map(blockId => getLocations(blockId))
   }
 
-  private def getPeers(blockManagerId: BlockManagerId, size: Int): 
Seq[BlockManagerId] = {
-val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-
-val selfIndex = peers.indexOf(blockManagerId)
+  /** Get the list of the peers of the given block manager */
+  private def getPeers(blockManagerId: BlockManagerId): 
Seq[BlockManagerId] = {
+val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver 
}.toArray
+val selfIndex = blockManagerIds.indexOf(blockManagerId)
 if (selfIndex == -1) {
-  throw new SparkException("Self index for " + blockManagerId + " not 
found")
+  logError("Self index for " + blockManagerId + " not found")
+  Seq.empty
+} else {
+  // If the blockManagerIds is [ id1 id2 id3 id4 id5 ] and the 
blockManagerId is id2
+  // Then this code will return the list [ id3 id4 id5 id1 ]
+  Array.tabulate[BlockManagerId](blockManagerIds.size - 1) { i =>
+blockManagerIds((selfIndex + i + 1) % blockManagerIds.size)
--- End diff --

or through `filterNot`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17498158
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala ---
@@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, 
conf: SparkConf, listenerBus
 blockIds.map(blockId => getLocations(blockId))
   }
 
-  private def getPeers(blockManagerId: BlockManagerId, size: Int): 
Seq[BlockManagerId] = {
-val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-
-val selfIndex = peers.indexOf(blockManagerId)
+  /** Get the list of the peers of the given block manager */
+  private def getPeers(blockManagerId: BlockManagerId): 
Seq[BlockManagerId] = {
+val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver 
}.toArray
+val selfIndex = blockManagerIds.indexOf(blockManagerId)
 if (selfIndex == -1) {
-  throw new SparkException("Self index for " + blockManagerId + " not 
found")
+  logError("Self index for " + blockManagerId + " not found")
+  Seq.empty
+} else {
+  // If the blockManagerIds is [ id1 id2 id3 id4 id5 ] and the 
blockManagerId is id2
+  // Then this code will return the list [ id3 id4 id5 id1 ]
+  Array.tabulate[BlockManagerId](blockManagerIds.size - 1) { i =>
+blockManagerIds((selfIndex + i + 1) % blockManagerIds.size)
--- End diff --

Again, I had just preserved the existing logic here. However, since I am 
changing the overall logic from deterministic peer selection (where I want to 
maintain the order they are returned, so that no two nodes use the same 3rd 
node as replication target) to random peer selection, I think your suggestion 
makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17498282
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with 
Matchers with BeforeAndAfter
 assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
 assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
   }
+
+  test("get peers with store addition and removal") {
--- End diff --

mind calling this BlockManager instead of store? I think most people don't 
call it store actually


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17498486
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with 
Matchers with BeforeAndAfter
 assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
 assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
   }
+
+  test("get peers with store addition and removal") {
+val numStores = 4
+val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, 
s"store$i") }
+val storeIds = stores.map { _.blockManagerId }.toSet
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId })
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId })
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId })
+
+// Add driver store and test whether it is filtered out
+val driverStore = makeBlockManager(1000, "")
+assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
+
+// Add a new store and test whether get peers returns it
+val newStore = makeBlockManager(1000, s"store$numStores")
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
+
+// Remove a store and test whether get peers returns it
+val storeIdToRemove = stores(0).blockManagerId
+master.removeExecutor(storeIdToRemove.executorId)
+
assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
+
+// Test whether asking for peers of a unregistered block manager id 
returns empty list
+assert(master.getPeers(stores(0).blockManagerId).isEmpty)
+assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
+  }
+
+  test("block replication - 2x") {
+testReplication(2,
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, 
MEMORY_AND_DISK_SER_2)
+)
+  }
+
+  test("block replication - 3x") {
+// Generate storage levels with 3x replication
+val storageLevels = {
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, 
MEMORY_AND_DISK_SER).map {
+level => StorageLevel(
+  level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 3)
+  }
+}
+testReplication(3, storageLevels)
+  }
+
+  test("block replication - mixed between 1x to 5x") {
+// Generate storage levels with varying replication
+val storageLevels = Seq(
+  MEMORY_ONLY,
+  MEMORY_ONLY_SER_2,
+  StorageLevel(true, false, false, false, 3),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, true, false, false, 5),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, false, false, false, 3),
+  MEMORY_ONLY_SER_2,
+  MEMORY_ONLY
+)
+testReplication(5, storageLevels)
+  }
+
+  test("block replication with addition and deletion of executors") {
+val blockSize = 1000
+val storeSize = 1
+val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, 
s"store$i") }
+
+def testPut(blockId: String, storageLevel: StorageLevel, 
expectedNumLocations: Int) {
+  try {
+initialStores(0).putSingle(blockId, new Array[Byte](blockSize), 
storageLevel)
+assert(master.getLocations(blockId).size === expectedNumLocations)
+  } finally {
+master.removeBlock(blockId)
+  }
--- End diff --

What's the point of this finally? If an assertion fails don't we just move 
on to the next test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if th

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17498517
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with 
Matchers with BeforeAndAfter
 assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
 assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
   }
+
+  test("get peers with store addition and removal") {
+val numStores = 4
+val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, 
s"store$i") }
+val storeIds = stores.map { _.blockManagerId }.toSet
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId })
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId })
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId })
+
+// Add driver store and test whether it is filtered out
+val driverStore = makeBlockManager(1000, "")
+assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
+
+// Add a new store and test whether get peers returns it
+val newStore = makeBlockManager(1000, s"store$numStores")
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
+
+// Remove a store and test whether get peers returns it
+val storeIdToRemove = stores(0).blockManagerId
+master.removeExecutor(storeIdToRemove.executorId)
+
assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
+
+// Test whether asking for peers of a unregistered block manager id 
returns empty list
+assert(master.getPeers(stores(0).blockManagerId).isEmpty)
+assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
+  }
+
+  test("block replication - 2x") {
+testReplication(2,
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, 
MEMORY_AND_DISK_SER_2)
+)
+  }
+
+  test("block replication - 3x") {
+// Generate storage levels with 3x replication
+val storageLevels = {
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, 
MEMORY_AND_DISK_SER).map {
+level => StorageLevel(
+  level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 3)
+  }
+}
+testReplication(3, storageLevels)
+  }
+
+  test("block replication - mixed between 1x to 5x") {
+// Generate storage levels with varying replication
+val storageLevels = Seq(
+  MEMORY_ONLY,
+  MEMORY_ONLY_SER_2,
+  StorageLevel(true, false, false, false, 3),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, true, false, false, 5),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, false, false, false, 3),
+  MEMORY_ONLY_SER_2,
+  MEMORY_ONLY
+)
+testReplication(5, storageLevels)
+  }
+
+  test("block replication with addition and deletion of executors") {
+val blockSize = 1000
+val storeSize = 1
+val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, 
s"store$i") }
+
+def testPut(blockId: String, storageLevel: StorageLevel, 
expectedNumLocations: Int) {
+  try {
+initialStores(0).putSingle(blockId, new Array[Byte](blockSize), 
storageLevel)
+assert(master.getLocations(blockId).size === expectedNumLocations)
+  } finally {
+master.removeBlock(blockId)
+  }
+}
+
+// 2x replication should work, 3x replication should only replicate 2x
+testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2)
+testPut("a2", StorageLevel(true, true, false, true, 3), 2)
+
+// Add another store, 3x replication should work now, 4x replication 
should

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17498587
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with 
Matchers with BeforeAndAfter
 assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
 assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
   }
+
+  test("get peers with store addition and removal") {
+val numStores = 4
+val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, 
s"store$i") }
+val storeIds = stores.map { _.blockManagerId }.toSet
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId })
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId })
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId })
+
+// Add driver store and test whether it is filtered out
+val driverStore = makeBlockManager(1000, "")
+assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
+
+// Add a new store and test whether get peers returns it
+val newStore = makeBlockManager(1000, s"store$numStores")
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
+
+// Remove a store and test whether get peers returns it
+val storeIdToRemove = stores(0).blockManagerId
+master.removeExecutor(storeIdToRemove.executorId)
+
assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
+
+// Test whether asking for peers of a unregistered block manager id 
returns empty list
+assert(master.getPeers(stores(0).blockManagerId).isEmpty)
+assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
+  }
+
+  test("block replication - 2x") {
+testReplication(2,
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, 
MEMORY_AND_DISK_SER_2)
+)
+  }
+
+  test("block replication - 3x") {
+// Generate storage levels with 3x replication
+val storageLevels = {
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, 
MEMORY_AND_DISK_SER).map {
+level => StorageLevel(
+  level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 3)
+  }
+}
+testReplication(3, storageLevels)
+  }
+
+  test("block replication - mixed between 1x to 5x") {
+// Generate storage levels with varying replication
+val storageLevels = Seq(
+  MEMORY_ONLY,
+  MEMORY_ONLY_SER_2,
+  StorageLevel(true, false, false, false, 3),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, true, false, false, 5),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, false, false, false, 3),
+  MEMORY_ONLY_SER_2,
+  MEMORY_ONLY
+)
+testReplication(5, storageLevels)
+  }
+
+  test("block replication with addition and deletion of executors") {
+val blockSize = 1000
+val storeSize = 1
+val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, 
s"store$i") }
+
+def testPut(blockId: String, storageLevel: StorageLevel, 
expectedNumLocations: Int) {
+  try {
+initialStores(0).putSingle(blockId, new Array[Byte](blockSize), 
storageLevel)
+assert(master.getLocations(blockId).size === expectedNumLocations)
+  } finally {
+master.removeBlock(blockId)
+  }
+}
+
+// 2x replication should work, 3x replication should only replicate 2x
+testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2)
+testPut("a2", StorageLevel(true, true, false, true, 3), 2)
+
+// Add another store, 3x replication should work now, 4x replication 
should

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17498639
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with 
Matchers with BeforeAndAfter
 assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
 assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
   }
+
+  test("get peers with store addition and removal") {
+val numStores = 4
+val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, 
s"store$i") }
+val storeIds = stores.map { _.blockManagerId }.toSet
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId })
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId })
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId })
+
+// Add driver store and test whether it is filtered out
+val driverStore = makeBlockManager(1000, "")
+assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
+
+// Add a new store and test whether get peers returns it
+val newStore = makeBlockManager(1000, s"store$numStores")
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
+
+// Remove a store and test whether get peers returns it
+val storeIdToRemove = stores(0).blockManagerId
+master.removeExecutor(storeIdToRemove.executorId)
+
assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
+
+// Test whether asking for peers of a unregistered block manager id 
returns empty list
+assert(master.getPeers(stores(0).blockManagerId).isEmpty)
+assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
+  }
+
+  test("block replication - 2x") {
+testReplication(2,
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, 
MEMORY_AND_DISK_SER_2)
+)
+  }
+
+  test("block replication - 3x") {
+// Generate storage levels with 3x replication
+val storageLevels = {
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, 
MEMORY_AND_DISK_SER).map {
+level => StorageLevel(
+  level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 3)
+  }
+}
+testReplication(3, storageLevels)
+  }
+
+  test("block replication - mixed between 1x to 5x") {
+// Generate storage levels with varying replication
+val storageLevels = Seq(
+  MEMORY_ONLY,
+  MEMORY_ONLY_SER_2,
+  StorageLevel(true, false, false, false, 3),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, true, false, false, 5),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, false, false, false, 3),
+  MEMORY_ONLY_SER_2,
+  MEMORY_ONLY
+)
+testReplication(5, storageLevels)
+  }
+
+  test("block replication with addition and deletion of executors") {
+val blockSize = 1000
+val storeSize = 1
+val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, 
s"store$i") }
+
+def testPut(blockId: String, storageLevel: StorageLevel, 
expectedNumLocations: Int) {
+  try {
+initialStores(0).putSingle(blockId, new Array[Byte](blockSize), 
storageLevel)
+assert(master.getLocations(blockId).size === expectedNumLocations)
+  } finally {
+master.removeBlock(blockId)
+  }
+}
+
+// 2x replication should work, 3x replication should only replicate 2x
+testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2)
+testPut("a2", StorageLevel(true, true, false, true, 3), 2)
+
+// Add another store, 3x replication should work now, 4x replication 
should

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-55451730
  
I looked at the logic in `BlockManager` in detail and it looks reasonable. 
I left a few minor comments all over the place, but in general this LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-55472830
  
What happens when there is recomputation which results in same blockId 
getting regenerated (unpersist followed by recomputation/persist or block drop 
followed by recomputation or something else ) ? It will now go to some random 
node potentially not same as previously selected ? Resulting in 
over-replication ?

A more corner case is if the computation was not idempotent ... and 
resulted in a changed dataset for the block - earlier it will get overwritten 
as part of replication : will we will now have two nodes with same data and a 
third (initially replicated to) which can diverge ?

Btw, from what I saw, node loss is not handled right ? So a block can get 
under replicated ? Would be nice if we added that in some day ...


Streaming is not the only application for replication :-) We use it in 
conjunction with locality wait levels to speed up computation when speculative 
execution is enabled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56293506
  
@mridulm Very good thoughts! I totally agree that replication is not only 
for streaming, and the implications of this patch in other scenarios is 
important to understand. Let me address them one by one.

*1. Over-replication: *
I can see that there is a chance that some sequence of events can cause a 
block to replicated to another node even if the node. But, I am trying to 
construct a scenario which would lead to over-replication with this patch BUT 
does not lead to over replication in current Spark. That is, I want to make 
sure that there is no regression in the behavior. 
Note that choosing nodes randomly can lead to more uniform memory usage in 
the cluster. So there is definitely an advantage to this patch. However to 
preserve previous behavior we can make the randomization deterministic to the 
block id. So if a block is recomputed on the same node where it had existed, it 
will get replicated to the same replication target as before. For each block, 
this is no different from the current behavior, hence should not create a 
regression in terms of the chances of over-replication. How does that sound?

*2. Non-idempotent computations: *
In case of such operations, there are other issues in the existing Spark as 
well. In the current block manager's put behavior, if a block exists locally, 
it is not readd as it is assumed that block contents are identical. As a 
result, with current Spark you can potentially have two nodes with two 
different versions of the same block. One can argue that this patch can 
increase the chances. The fix I proposed above should take care of it; the 
probability of that happening wont be different.

*3. Under-replication:*
Yes, node loss was not and is still not handled and is outside the scope of 
this patch. Sure would be nice to add it some day. But that definitely leads to 
further complexity in the BlockManager.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56293724
  
@tdas handling (1) deterministically will make (2) in line with what we 
currently have.
And that should be sufficient imo.

(3) was not in context of this patch - but a general shortcoming of spark 
currently.
Alleviating (3) might be complicated (not sure how much so) - but will have 
some very interesting consequences to performance (among others).

For example: this prevents us from using block persistance for checkpoint - 
there was a discussion about this in a JIRA a while back (forgot id) ... 
resolving this and with 3x replicated blocks, will mean we get really cheap and 
very performent checkpoint (while having fault tolerance at par with hdfs)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17825042
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala ---
@@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, 
conf: SparkConf, listenerBus
 blockIds.map(blockId => getLocations(blockId))
   }
 
-  private def getPeers(blockManagerId: BlockManagerId, size: Int): 
Seq[BlockManagerId] = {
-val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-
-val selfIndex = peers.indexOf(blockManagerId)
+  /** Get the list of the peers of the given block manager */
--- End diff --

"peers" automatically means that it does not include the self. It should be 
obvious.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17825062
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with 
Matchers with BeforeAndAfter
 assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
 assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
   }
+
+  test("get peers with store addition and removal") {
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17825064
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -62,6 +63,7 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
   val securityMgr = new SecurityManager(conf)
   val mapOutputTracker = new MapOutputTrackerMaster(conf)
   val shuffleManager = new HashShuffleManager(conf)
+  val allStores = new ArrayBuffer[BlockManager]
--- End diff --

Added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17825538
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with 
Matchers with BeforeAndAfter
 assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
 assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
   }
+
+  test("get peers with store addition and removal") {
+val numStores = 4
+val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, 
s"store$i") }
+val storeIds = stores.map { _.blockManagerId }.toSet
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId })
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId })
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId })
+
+// Add driver store and test whether it is filtered out
+val driverStore = makeBlockManager(1000, "")
+assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
+
+// Add a new store and test whether get peers returns it
+val newStore = makeBlockManager(1000, s"store$numStores")
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
+
+// Remove a store and test whether get peers returns it
+val storeIdToRemove = stores(0).blockManagerId
+master.removeExecutor(storeIdToRemove.executorId)
+
assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
+
+// Test whether asking for peers of a unregistered block manager id 
returns empty list
+assert(master.getPeers(stores(0).blockManagerId).isEmpty)
+assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
+  }
+
+  test("block replication - 2x") {
+testReplication(2,
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, 
MEMORY_AND_DISK_SER_2)
+)
+  }
+
+  test("block replication - 3x") {
+// Generate storage levels with 3x replication
+val storageLevels = {
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, 
MEMORY_AND_DISK_SER).map {
+level => StorageLevel(
+  level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 3)
+  }
+}
+testReplication(3, storageLevels)
+  }
+
+  test("block replication - mixed between 1x to 5x") {
+// Generate storage levels with varying replication
+val storageLevels = Seq(
+  MEMORY_ONLY,
+  MEMORY_ONLY_SER_2,
+  StorageLevel(true, false, false, false, 3),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, true, false, false, 5),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, false, false, false, 3),
+  MEMORY_ONLY_SER_2,
+  MEMORY_ONLY
+)
+testReplication(5, storageLevels)
+  }
+
+  test("block replication with addition and deletion of executors") {
+val blockSize = 1000
+val storeSize = 1
+val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, 
s"store$i") }
+
+def testPut(blockId: String, storageLevel: StorageLevel, 
expectedNumLocations: Int) {
+  try {
+initialStores(0).putSingle(blockId, new Array[Byte](blockSize), 
storageLevel)
+assert(master.getLocations(blockId).size === expectedNumLocations)
+  } finally {
+master.removeBlock(blockId)
+  }
--- End diff --

This is confusing. The reason there is a `try...finally` is that this 
function `testPut` can be tried over and over again, inside a `eventually` 
block. So if the assertion fails, the system does not move on to the next unit 
test. The system needs to be reset, by removing the block that w

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17825562
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with 
Matchers with BeforeAndAfter
 assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
 assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
   }
+
+  test("get peers with store addition and removal") {
+val numStores = 4
+val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, 
s"store$i") }
+val storeIds = stores.map { _.blockManagerId }.toSet
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId })
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId })
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId })
+
+// Add driver store and test whether it is filtered out
+val driverStore = makeBlockManager(1000, "")
+assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
+
+// Add a new store and test whether get peers returns it
+val newStore = makeBlockManager(1000, s"store$numStores")
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
+
+// Remove a store and test whether get peers returns it
+val storeIdToRemove = stores(0).blockManagerId
+master.removeExecutor(storeIdToRemove.executorId)
+
assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
+
+// Test whether asking for peers of a unregistered block manager id 
returns empty list
+assert(master.getPeers(stores(0).blockManagerId).isEmpty)
+assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
+  }
+
+  test("block replication - 2x") {
+testReplication(2,
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, 
MEMORY_AND_DISK_SER_2)
+)
+  }
+
+  test("block replication - 3x") {
+// Generate storage levels with 3x replication
+val storageLevels = {
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, 
MEMORY_AND_DISK_SER).map {
+level => StorageLevel(
+  level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 3)
+  }
+}
+testReplication(3, storageLevels)
+  }
+
+  test("block replication - mixed between 1x to 5x") {
+// Generate storage levels with varying replication
+val storageLevels = Seq(
+  MEMORY_ONLY,
+  MEMORY_ONLY_SER_2,
+  StorageLevel(true, false, false, false, 3),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, true, false, false, 5),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, false, false, false, 3),
+  MEMORY_ONLY_SER_2,
+  MEMORY_ONLY
+)
+testReplication(5, storageLevels)
+  }
+
+  test("block replication with addition and deletion of executors") {
+val blockSize = 1000
+val storeSize = 1
+val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, 
s"store$i") }
+
+def testPut(blockId: String, storageLevel: StorageLevel, 
expectedNumLocations: Int) {
+  try {
+initialStores(0).putSingle(blockId, new Array[Byte](blockSize), 
storageLevel)
+assert(master.getLocations(blockId).size === expectedNumLocations)
+  } finally {
+master.removeBlock(blockId)
+  }
+}
+
+// 2x replication should work, 3x replication should only replicate 2x
+testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2)
+testPut("a2", StorageLevel(true, true, false, true, 3), 2)
+
+// Add another store, 3x replication should work now, 4x replication 
should only 

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17825572
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with 
Matchers with BeforeAndAfter
 assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
 assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
   }
+
+  test("get peers with store addition and removal") {
+val numStores = 4
+val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, 
s"store$i") }
+val storeIds = stores.map { _.blockManagerId }.toSet
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId })
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId })
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId })
+
+// Add driver store and test whether it is filtered out
+val driverStore = makeBlockManager(1000, "")
+assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
+assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
+
+// Add a new store and test whether get peers returns it
+val newStore = makeBlockManager(1000, s"store$numStores")
+assert(master.getPeers(stores(0).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(0).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(1).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(1).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(stores(2).blockManagerId).toSet ===
+  storeIds.filterNot { _ == stores(2).blockManagerId } + 
newStore.blockManagerId)
+assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
+
+// Remove a store and test whether get peers returns it
+val storeIdToRemove = stores(0).blockManagerId
+master.removeExecutor(storeIdToRemove.executorId)
+
assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
+
assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
+
+// Test whether asking for peers of a unregistered block manager id 
returns empty list
+assert(master.getPeers(stores(0).blockManagerId).isEmpty)
+assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
+  }
+
+  test("block replication - 2x") {
+testReplication(2,
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, 
MEMORY_AND_DISK_SER_2)
+)
+  }
+
+  test("block replication - 3x") {
+// Generate storage levels with 3x replication
+val storageLevels = {
+  Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, 
MEMORY_AND_DISK_SER).map {
+level => StorageLevel(
+  level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 3)
+  }
+}
+testReplication(3, storageLevels)
+  }
+
+  test("block replication - mixed between 1x to 5x") {
+// Generate storage levels with varying replication
+val storageLevels = Seq(
+  MEMORY_ONLY,
+  MEMORY_ONLY_SER_2,
+  StorageLevel(true, false, false, false, 3),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, true, false, false, 5),
+  StorageLevel(true, true, false, true, 4),
+  StorageLevel(true, false, false, false, 3),
+  MEMORY_ONLY_SER_2,
+  MEMORY_ONLY
+)
+testReplication(5, storageLevels)
+  }
+
+  test("block replication with addition and deletion of executors") {
+val blockSize = 1000
+val storeSize = 1
+val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, 
s"store$i") }
+
+def testPut(blockId: String, storageLevel: StorageLevel, 
expectedNumLocations: Int) {
+  try {
+initialStores(0).putSingle(blockId, new Array[Byte](blockSize), 
storageLevel)
+assert(master.getLocations(blockId).size === expectedNumLocations)
+  } finally {
+master.removeBlock(blockId)
+  }
+}
+
+// 2x replication should work, 3x replication should only replicate 2x
+testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2)
+testPut("a2", StorageLevel(true, true, false, true, 3), 2)
+
+// Add another store, 3x replication should work now, 4x replication 
should only 

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56311396
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20634/consoleFull)
 for   PR 2366 at commit 
[`3821ab9`](https://github.com/apache/spark/commit/3821ab971bcc85b182288f9039bf38da0acedece).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56313463
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20634/consoleFull)
 for   PR 2366 at commit 
[`3821ab9`](https://github.com/apache/spark/commit/3821ab971bcc85b182288f9039bf38da0acedece).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class GetPeers(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56317365
  
@mridulm I implemented (1) and also added an unit test for testing that 
behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56317539
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20636/consoleFull)
 for   PR 2366 at commit 
[`08afaa9`](https://github.com/apache/spark/commit/08afaa94e0672ae60bee6737c040ec0d9de9d268).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17829791
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
--- End diff --

can you rename this updatePeersFromMaster? the current name seems to 
suggest it is a really cheap getter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56319636
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20636/consoleFull)
 for   PR 2366 at commit 
[`08afaa9`](https://github.com/apache/spark/commit/08afaa94e0672ae60bee6737c040ec0d9de9d268).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class GetPeers(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17829854
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
--- End diff --

also update to comment to say more, like this is fetching an updated list 
from the driver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17829869
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -111,6 +111,8 @@ private[spark] class BlockManager(
 MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, 
conf)
   private val broadcastCleaner = new MetadataCleaner(
 MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
+  private val cachedPeers = new HashSet[BlockManagerId]
--- End diff --

when do u ever remove entries from here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17829908
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -111,6 +111,8 @@ private[spark] class BlockManager(
 MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, 
conf)
   private val broadcastCleaner = new MetadataCleaner(
 MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
+  private val cachedPeers = new HashSet[BlockManagerId]
--- End diff --

ah ic you clear the hashset down there ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17829919
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
--- End diff --

you should also do the timeout check in the synchronized block - because 
otherwise two racing requests will immediately send two requests to the driver, 
and the requests are kind of expensive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17829945
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
--- End diff --

actually -- never mind my renaming comment, since this uses the cache. you 
should still update the comment to state what this does though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17829956
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
--- End diff --

the toSeq seems expensive?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17829979
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
 }
-for (peer: BlockManagerId <- cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(s"Try to replicate $blockId once; The size of the data is 
${data.limit()} Bytes. " +
-s"To node: $peer")
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =>
-  logError(s"Failed to replicate block to $peer", e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+while (!done) {
--- End diff --

can we get rid of done and just use
```scala
while (peersReplicatedTo.size < numPeersToReplicateTo && failures <= 
maxReplicationFailures) {
```

otherwise we have to track the place where done is updated to find out when 
it is done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17830017
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
 }
-for (peer: BlockManagerId <- cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(s"Try to replicate $blockId once; The size of the data is 
${data.limit()} Bytes. " +
-s"To node: $peer")
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =>
-  logError(s"Failed to replicate block to $peer", e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+while (!done) {
--- End diff --

ah it's trickier to handle the None case. 

Ok in that case let's keep the done, but do comment explicitly on the three 
conditions that this will terminate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17830025
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
--- End diff --

why nano instead of current milli?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17831655
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
--- End diff --

`updatePeersFromMaster` is a misnomer as it does not actually query master 
until the parameter is true or the TTL has expired (which I have increased to 
60 seconds). MOST of the time it is a cheap operation. So I am not sure whats 
best here. 

I am totally open to more suggestions. How about keeping it `getPeers` with 
param name as `forceUpdateFromMaster`? 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17833363
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
--- End diff --

This would mean that, assuming there is no change to number of executors in 
system, we will consistently get the same peer back.
But even if some unrelated peer was added or removed, we wont get back the 
same peer we cached to last time.

Did I read the correctly, or am I missing something ? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17833383
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
 }
-for (peer: BlockManagerId <- cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(s"Try to replicate $blockId once; The size of the data is 
${data.limit()} Bytes. " +
-s"To node: $peer")
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =>
-  logError(s"Failed to replicate block to $peer", e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+while (!done) {
+  getRandomPeer() match {
+case Some(peer) =>
+  try {
+val onePeerStartTime = System.nanoTime
+data.rewind()
+logTrace(s"Trying to replicate $blockId of ${data.limit()} 
bytes to $peer")
+blockTransferService.uploadBlockSync(
+  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
+logTrace(s"Replicated $blockId of ${data.limit()} bytes to 
$peer in %f ms"
+  .format((System.nanoTime - onePeerStartTime) / 1e6))
+peersReplicatedTo += peer
+forceFetchPeers = false
+if (peersReplicatedTo.size == numPeersToReplicateTo) {
+  done = true
+}
+  } catch {
+case e: Exception =>
+  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
+  failures += 1
+  forceFetchPeers = true
+  peersFailedToReplicateTo += peer
--- End diff --

Ideally, we might want to cache this peersFailedToReplicateTo across block 
updates for a short ttl (to temporarily blacklist replication to peer).
But that can be done in a future PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17833419
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
 }
-for (peer: BlockManagerId <- cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(s"Try to replicate $blockId once; The size of the data is 
${data.limit()} Bytes. " +
-s"To node: $peer")
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =>
-  logError(s"Failed to replicate block to $peer", e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+while (!done) {
+  getRandomPeer() match {
+case Some(peer) =>
+  try {
+val onePeerStartTime = System.nanoTime
+data.rewind()
+logTrace(s"Trying to replicate $blockId of ${data.limit()} 
bytes to $peer")
+blockTransferService.uploadBlockSync(
+  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
+logTrace(s"Replicated $blockId of ${data.limit()} bytes to 
$peer in %f ms"
+  .format((System.nanoTime - onePeerStartTime) / 1e6))
+peersReplicatedTo += peer
+forceFetchPeers = false
+if (peersReplicatedTo.size == numPeersToReplicateTo) {
+  done = true
+}
+  } catch {
+case e: Exception =>
+  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
+  failures += 1
+  forceFetchPeers = true
+  peersFailedToReplicateTo += peer
--- End diff --

Btw, curious - will replication fail only when remote peer is dead ? (and 
so requiring forceFetchPeers)
What about inability to add block in remote peer ? Will that cause an 
exception to be raised here ?

Eseentially I am trying to understand if Exception raised here always means 
remote peer is 'dead'.
Alternative might be to list peers which have atleast 
data.rewrind().remaining() space available : but we dont support that iirc (and 
it can get used up before we make this call anyway I guess).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled a

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17833483
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
 }
-for (peer: BlockManagerId <- cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(s"Try to replicate $blockId once; The size of the data is 
${data.limit()} Bytes. " +
-s"To node: $peer")
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =>
-  logError(s"Failed to replicate block to $peer", e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+while (!done) {
+  getRandomPeer() match {
+case Some(peer) =>
+  try {
+val onePeerStartTime = System.nanoTime
+data.rewind()
+logTrace(s"Trying to replicate $blockId of ${data.limit()} 
bytes to $peer")
+blockTransferService.uploadBlockSync(
+  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
+logTrace(s"Replicated $blockId of ${data.limit()} bytes to 
$peer in %f ms"
+  .format((System.nanoTime - onePeerStartTime) / 1e6))
+peersReplicatedTo += peer
+forceFetchPeers = false
+if (peersReplicatedTo.size == numPeersToReplicateTo) {
+  done = true
+}
+  } catch {
+case e: Exception =>
+  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
+  failures += 1
+  forceFetchPeers = true
+  peersFailedToReplicateTo += peer
+  if (failures > maxReplicationFailures) {
+done = true
+  }
+  }
+case None =>
+  // no peer left to replicate to
+  done = true
--- End diff --

What if initial list had only self in executor list and we are within TTL 
(and so getPeers returns empty list) - bootstrapping time for example.
Do we want to check if server has updates for us ? This will kind of hose 
our ttl though ... but maybe corner case.

Or is this handled already ? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17902630
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
--- End diff --

Good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17904273
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
--- End diff --

@mridulm You read that correctly, but @rxin point is different. He is 
wondering whether converting the `HashSet` to a `Seq` (so that we can select 
one by index) every time we select a peer could be expensive if the size of the 
peers is large (say, in a cluster with 1000s of nodes?). I am not entirely 
convinced that even if the list of nodes is O(1000) the computation is going to 
be expensive. But I am still going to make an attempt to make this more 
efficient. In fact, repeated calculation of `peers` in line 828 every time a 
peer needs to be selected, can also be avoided.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17906928
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
 }
-for (peer: BlockManagerId <- cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(s"Try to replicate $blockId once; The size of the data is 
${data.limit()} Bytes. " +
-s"To node: $peer")
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =>
-  logError(s"Failed to replicate block to $peer", e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+while (!done) {
--- End diff --

I added comments before the while, as well as at all the 3 places where 
`done` is marked as `true`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17907153
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
--- End diff --

Because existing code did so. Changing to milli.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17916865
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
 }
-for (peer: BlockManagerId <- cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(s"Try to replicate $blockId once; The size of the data is 
${data.limit()} Bytes. " +
-s"To node: $peer")
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =>
-  logError(s"Failed to replicate block to $peer", e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+while (!done) {
+  getRandomPeer() match {
+case Some(peer) =>
+  try {
+val onePeerStartTime = System.nanoTime
+data.rewind()
+logTrace(s"Trying to replicate $blockId of ${data.limit()} 
bytes to $peer")
+blockTransferService.uploadBlockSync(
+  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
+logTrace(s"Replicated $blockId of ${data.limit()} bytes to 
$peer in %f ms"
+  .format((System.nanoTime - onePeerStartTime) / 1e6))
+peersReplicatedTo += peer
+forceFetchPeers = false
+if (peersReplicatedTo.size == numPeersToReplicateTo) {
+  done = true
+}
+  } catch {
+case e: Exception =>
+  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
+  failures += 1
+  forceFetchPeers = true
+  peersFailedToReplicateTo += peer
--- End diff --

I agree that there may be other reasons for failure to send to a remote 
node. Even in those cases, the current behavior of re-fetching the peer list 
and sending to another node, is correct. Just not the most efficient. This 
optimization is something that can be addressed in a future PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To 

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17921361
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
 }
-for (peer: BlockManagerId <- cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(s"Try to replicate $blockId once; The size of the data is 
${data.limit()} Bytes. " +
-s"To node: $peer")
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =>
-  logError(s"Failed to replicate block to $peer", e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+while (!done) {
+  getRandomPeer() match {
+case Some(peer) =>
+  try {
+val onePeerStartTime = System.nanoTime
+data.rewind()
+logTrace(s"Trying to replicate $blockId of ${data.limit()} 
bytes to $peer")
+blockTransferService.uploadBlockSync(
+  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
+logTrace(s"Replicated $blockId of ${data.limit()} bytes to 
$peer in %f ms"
+  .format((System.nanoTime - onePeerStartTime) / 1e6))
+peersReplicatedTo += peer
+forceFetchPeers = false
+if (peersReplicatedTo.size == numPeersToReplicateTo) {
+  done = true
+}
+  } catch {
+case e: Exception =>
+  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
+  failures += 1
+  forceFetchPeers = true
+  peersFailedToReplicateTo += peer
+  if (failures > maxReplicationFailures) {
+done = true
+  }
+  }
+case None =>
+  // no peer left to replicate to
+  done = true
--- End diff --

The initial list wont ever have self as the BlockManagerMaster returns list 
of nodes excluding the id of the node requesting it (that is, self). 
Nonetheless getPeers can return empty list (e.g., local mode, with only one 
BlockManager). And can also happen in the bootstrapping time. However, current 
Spark already suffers from this problem. In fact its much worse. Currently, the 
peer list is only fetched once from master ( upon first time replication)

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56553233
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20711/consoleFull)
 for   PR 2366 at commit 
[`89f91a0`](https://github.com/apache/spark/commit/89f91a0109bd7d8b988bcaf800170bd82e1678f7).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17924427
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
--- End diff --

Ah yes, good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56561115
  
@rxin, @mridulm I have slightly changed the logic to select one peer at 
random to make it more efficient. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56563234
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20711/consoleFull)
 for   PR 2366 at commit 
[`89f91a0`](https://github.com/apache/spark/commit/89f91a0109bd7d8b988bcaf800170bd82e1678f7).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class GetPeers(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster`
  * `logInfo("Interrupting user class to stop.")`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56563250
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20711/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17927862
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +790,111 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): 
mutable.HashSet[BlockManagerId] = {
+cachedPeers.synchronized {
+  val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
--- End diff --

There is an MT bug here.
Since cachedPeers is updated in place, it is possible for 'previous' 
invocation to be using cachedPeers while the next invocation is 
clearing/updating it.

We can avoid that by overwriting cachedPeers instance variable with result 
of master.getPeers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56566367
  
@tdas In case I did not mention it before :-) this is definitely a great 
improvement over what existed earlier !
I would love it if we could (sometime soon I hope) add support for 
re-replication of blocks due to lost executors : which, currently, is outside 
scope of this PR it seems.

Other than the MT bug I mentioned above, this looks good to me !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56573235
  
If this is getting more complicated, we should consider standardizing the 
internal api and then buliding a separate service that properly handles all 
these issues. That service can also handle serving shuffle blocks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18117922
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +790,111 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): 
mutable.HashSet[BlockManagerId] = {
+cachedPeers.synchronized {
+  val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
--- End diff --

Good point. Then we would need a separate locking object for synchronizing 
this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57032133
  
  [QA tests have 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20888/consoleFull)
 for   PR 2366 at commit 
[`012afa3`](https://github.com/apache/spark/commit/012afa32f0f009e43eb6da28036087cc5264a7b3).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57035730
  
  [QA tests have 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20888/consoleFull)
 for   PR 2366 at commit 
[`012afa3`](https://github.com/apache/spark/commit/012afa32f0f009e43eb6da28036087cc5264a7b3).
 * This patch **passes** unit tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class GetPeers(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57035733
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20888/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-26 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57042546
  
@rxin I dont think it is getting more complicated that the status quo. The 
complexity of fetching and caching of peers is contained in this one method 
`getPeers`, so it should be reasonably self contained and easy to replace with 
a different implementation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-26 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-57042584
  
No I was responding to Mridul's rebalancing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-26 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18122027
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 
* 1000) // milliseconds
--- End diff --

this is a constant - so why not just put it outside of this function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-26 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18122030
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -111,6 +112,9 @@ private[spark] class BlockManager(
 MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, 
conf)
   private val broadcastCleaner = new MetadataCleaner(
 MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
+  @volatile private var cachedPeers: Seq[BlockManagerId] = _
--- End diff --

add a blank line so we have some logical separation. even better if you can 
add some inline comment ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-26 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18122031
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 
* 1000) // milliseconds
--- End diff --

actually probably no big deal to leave this here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-26 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18122034
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +791,110 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+peerFetchLock.synchronized {
+  val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+  if (cachedPeers == null || forceFetch || timeout) {
+cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+  cachedPeers
+}
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersForReplication = new ArrayBuffer[BlockManagerId]
--- End diff --

not that big of a deal, but maybe you can reduce the initial size of the 
array buffer to 3


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-26 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r18122036
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug("Fetched peers from master: " + cachedPeers.mkString("[", 
",", "]"))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
--- End diff --

i guess this hasn't happened yet? :p


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >