[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18200248 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds --- End diff -- I didnt want to pollute the namespace inside the BlockManager class any more than absolutely necessary. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18200286 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -111,6 +112,9 @@ private[spark] class BlockManager( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) + @volatile private var cachedPeers: Seq[BlockManagerId] = _ --- End diff -- Good idea. Should have done that myself. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18200328 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + if (cachedPeers == null || forceFetch || timeout) { +cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } + cachedPeers +} + } + + /** * Replicate block to another node. --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18201177 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + if (cachedPeers == null || forceFetch || timeout) { +cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } + cachedPeers +} + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersForReplication = new ArrayBuffer[BlockManagerId] --- End diff -- Why 3? This will be as large as `cluster size - 1`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18201208 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + if (cachedPeers == null || forceFetch || timeout) { +cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } + cachedPeers +} + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersForReplication = new ArrayBuffer[BlockManagerId] --- End diff -- nvm ignore that comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18201230 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + if (cachedPeers == null || forceFetch || timeout) { +cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } + cachedPeers +} + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersForReplication = new ArrayBuffer[BlockManagerId] +val peersReplicatedTo = new ArrayBuffer[BlockManagerId] +val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var replicationFailed = false +var failures = 0 +var done = false + +// Get cached list of peers +peersForReplication ++= getPeers(forceFetch = false) + +// Get a random peer. Note that this selection of a peer is deterministic on the block id. +// So assuming the list of peers does not change and no replication failures, +// if there are multiple attempts in the same node to replicate the same block, +// the same set of peers will be selected. +def getRandomPeer(): Option[BlockManagerId] = { + // If replication had failed, then force update the cached list of peers and remove the peers + // that have been already used + if (replicationFailed) { +peersForReplication.clear() +peersForReplication ++= getPeers(forceFetch = true) +peersForReplication --= peersReplicatedTo +peersForReplication --= peersFailedToReplicateTo + } + if (!peersForReplication.isEmpty) { +Some(peersForReplication(random.nextInt(peersForReplication.size))) + } else { +None + } } -for (peer: BlockManagerId <- cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + -s"To node: $peer") - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception => - logError(s"Failed to replicate block to $peer", e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +// +// This selection of a peer and replication is continued in a loop until one of the +// following 3 conditions is fulfilled: +// (i) specified number of peers have been replicated to +// (ii) too many failures in replicating to peers +// (iii) no peer left to replicate to +// +while (!done) { + getRandomPeer() match { +case Some(peer) => + try { +val onePeerStartTime = System.currentTimeMillis +data.rewind() +logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") +blockTransferService.uploadBlockSync( + peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) +logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms" + .format((System.currentTimeMillis - onePeerStartTime) /
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18201286 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + if (cachedPeers == null || forceFetch || timeout) { +cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } + cachedPeers +} + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersForReplication = new ArrayBuffer[BlockManagerId] +val peersReplicatedTo = new ArrayBuffer[BlockManagerId] +val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var replicationFailed = false +var failures = 0 +var done = false + +// Get cached list of peers +peersForReplication ++= getPeers(forceFetch = false) + +// Get a random peer. Note that this selection of a peer is deterministic on the block id. +// So assuming the list of peers does not change and no replication failures, +// if there are multiple attempts in the same node to replicate the same block, +// the same set of peers will be selected. +def getRandomPeer(): Option[BlockManagerId] = { + // If replication had failed, then force update the cached list of peers and remove the peers + // that have been already used + if (replicationFailed) { +peersForReplication.clear() +peersForReplication ++= getPeers(forceFetch = true) +peersForReplication --= peersReplicatedTo +peersForReplication --= peersFailedToReplicateTo + } + if (!peersForReplication.isEmpty) { +Some(peersForReplication(random.nextInt(peersForReplication.size))) + } else { +None + } } -for (peer: BlockManagerId <- cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + -s"To node: $peer") - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception => - logError(s"Failed to replicate block to $peer", e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +// +// This selection of a peer and replication is continued in a loop until one of the +// following 3 conditions is fulfilled: +// (i) specified number of peers have been replicated to +// (ii) too many failures in replicating to peers +// (iii) no peer left to replicate to +// +while (!done) { + getRandomPeer() match { +case Some(peer) => + try { +val onePeerStartTime = System.currentTimeMillis +data.rewind() +logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") +blockTransferService.uploadBlockSync( + peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) +logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms" + .format((System.currentTimeMillis - onePeerStartTime) /
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18233118 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala --- @@ -59,6 +59,8 @@ class BlockManagerId private ( def port: Int = port_ + def isDriver = (executorId == "") --- End diff -- Right! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18233139 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime --- End diff -- Sorry :P --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57351691 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21043/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57352608 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21044/consoleFull) for PR 2366 at commit [`0661773`](https://github.com/apache/spark/commit/06617739863b3c79a7be91e83bf382336b03083e). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57363077 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21044/consoleFull) for PR 2366 at commit [`0661773`](https://github.com/apache/spark/commit/06617739863b3c79a7be91e83bf382336b03083e). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57363086 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21044/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57402676 @rxin This is good to go! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18259626 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1228,4 +1244,314 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } + + test("get peers with addition and removal of block managers") { --- End diff -- should we create a new test suite BlockManagerReplicationSuite? This file is getting long. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57676144 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57676219 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21214/consoleFull) for PR 2366 at commit [`9690f57`](https://github.com/apache/spark/commit/9690f57be441ea15b3c9de040d6ba07bec262e22). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57678711 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21215/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57692138 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21214/consoleFull) for PR 2366 at commit [`9690f57`](https://github.com/apache/spark/commit/9690f57be441ea15b3c9de040d6ba07bec262e22). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57692143 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21214/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57703944 Thanks. Merging in master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2366 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/2366 [SPARK-3495] Block replication fails continuously when the replication target node is dead If a block manager (say, A) wants to replicate a block and the node chosen for replication (say, B) is dead, then the attempt to send the block to B fails. However, this continues to fail indefinitely. Even if the driver learns about the demise of the B, A continues to try replicating to B and failing miserably. The reason behind this bug is that A initially fetches a list of peers from the driver (when B was active), but never updates it after B is dead. This affects Spark Streaming as its receiver uses block replication. The solution in this patch adds the following. - Changed BlockManagerMaster to return all the peers of a block manager, rather than the requested number. - Refactored BlockManager's replication code to handle peer caching correctly. + The peer for replication is randomly selected. This is different from past behavior where for a node A, a node B was deterministically chosen for the lifetime of the application. + If replication fails to one node, the peers are refetched. + The peer cached has a TTL of 1 second to enable discovery of new peers and using them for replication. - Added replication unit tests (replication was not tested till now, duh!) This should not make a difference in performance of Spark workloads where replication is not used. @andrewor14 @JoshRosen You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark replication-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2366.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2366 commit af0c1daea8a22bca3b7826322205c887370ce247 Author: Tathagata Das Date: 2014-09-11T02:54:31Z Added replication unit tests to BlockManagerSuite commit 9f0ac9fb20660ff183490d13f4e3195b9283bc61 Author: Tathagata Das Date: 2014-09-11T08:44:18Z Modified replication tests to fail on replication bug. commit d081bf60e87689994a006603f84cb8f22ab19c6a Author: Tathagata Das Date: 2014-09-11T20:58:14Z Fixed bug in get peers and unit tests to test get-peers and replication under executor churn. commit 03de02d532f51b23bc1b79fc76115aacbd64a4b1 Author: Tathagata Das Date: 2014-09-12T00:46:02Z Change replication logic to correctly refetch peers from master on failure and on new worker addition. commit 7598f913c52728f25b6bce91dd9ae6879105e261 Author: Tathagata Das Date: 2014-09-12T00:52:16Z Minor changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-55351240 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-55351493 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20190/consoleFull) for PR 2366 at commit [`7598f91`](https://github.com/apache/spark/commit/7598f913c52728f25b6bce91dd9ae6879105e261). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-55352316 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20193/consoleFull) for PR 2366 at commit [`4a20531`](https://github.com/apache/spark/commit/4a205314cf1fc9a7a413bc1fb066fe2bb2c21932). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17460098 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,87 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds --- End diff -- Is 1000 is good enough? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-55354916 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20190/consoleFull) for PR 2366 at commit [`7598f91`](https://github.com/apache/spark/commit/7598f913c52728f25b6bce91dd9ae6879105e261). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-55355335 GetPeers class signatures has been changed from `case class GetPeers(blockManagerId: BlockManagerId, numPeers: Int)` to `case class GetPeers(blockManagerId: BlockManagerId)` . This is probably okay as this is developer API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-55355811 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20193/consoleFull) for PR 2366 at commit [`4a20531`](https://github.com/apache/spark/commit/4a205314cf1fc9a7a413bc1fb066fe2bb2c21932). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-55372429 Isn't 1s cache span too low? How often will we get a cache hit if they expire in 1 sec? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17493889 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala --- @@ -59,6 +59,8 @@ class BlockManagerId private ( def port: Int = port_ + def isDriver = (executorId == "") --- End diff -- +1 on this. I added this TODO a while back and I think this also affects some UI code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-55447441 Yeah, 1 second is probably too low (though there will be 4/5 cache hits for every miss for streaming). Better to have it be a minute. Its a tradeoff between how fast we want streaming to find new nodes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17497968 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala --- @@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockIds.map(blockId => getLocations(blockId)) } - private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = { -val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray - -val selfIndex = peers.indexOf(blockManagerId) + /** Get the list of the peers of the given block manager */ + private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { +val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver }.toArray +val selfIndex = blockManagerIds.indexOf(blockManagerId) --- End diff -- Why not just do `contains`? Then we don't need to convert this to an array and use indexof --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17498010 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala --- @@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockIds.map(blockId => getLocations(blockId)) } - private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = { -val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray - -val selfIndex = peers.indexOf(blockManagerId) + /** Get the list of the peers of the given block manager */ + private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { +val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver }.toArray +val selfIndex = blockManagerIds.indexOf(blockManagerId) if (selfIndex == -1) { - throw new SparkException("Self index for " + blockManagerId + " not found") + logError("Self index for " + blockManagerId + " not found") + Seq.empty +} else { + // If the blockManagerIds is [ id1 id2 id3 id4 id5 ] and the blockManagerId is id2 + // Then this code will return the list [ id3 id4 id5 id1 ] + Array.tabulate[BlockManagerId](blockManagerIds.size - 1) { i => +blockManagerIds((selfIndex + i + 1) % blockManagerIds.size) --- End diff -- Here we can just subtract it from the set instead of doing this complicated logic, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17498022 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala --- @@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockIds.map(blockId => getLocations(blockId)) } - private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = { -val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray - -val selfIndex = peers.indexOf(blockManagerId) + /** Get the list of the peers of the given block manager */ --- End diff -- Maybe add a comment to explain that this excludes self --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17498056 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala --- @@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockIds.map(blockId => getLocations(blockId)) } - private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = { -val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray - -val selfIndex = peers.indexOf(blockManagerId) + /** Get the list of the peers of the given block manager */ + private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { +val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver }.toArray +val selfIndex = blockManagerIds.indexOf(blockManagerId) --- End diff -- Yeah, had just kept the existing code here as is. Will change, obviously better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17498083 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -62,6 +63,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter val securityMgr = new SecurityManager(conf) val mapOutputTracker = new MapOutputTrackerMaster(conf) val shuffleManager = new HashShuffleManager(conf) + val allStores = new ArrayBuffer[BlockManager] --- End diff -- Can you add a comment to explain the reason why we need this is so we can ensure all prior block managers are stopped before starting each test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17498146 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala --- @@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockIds.map(blockId => getLocations(blockId)) } - private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = { -val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray - -val selfIndex = peers.indexOf(blockManagerId) + /** Get the list of the peers of the given block manager */ + private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { +val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver }.toArray +val selfIndex = blockManagerIds.indexOf(blockManagerId) if (selfIndex == -1) { - throw new SparkException("Self index for " + blockManagerId + " not found") + logError("Self index for " + blockManagerId + " not found") + Seq.empty +} else { + // If the blockManagerIds is [ id1 id2 id3 id4 id5 ] and the blockManagerId is id2 + // Then this code will return the list [ id3 id4 id5 id1 ] + Array.tabulate[BlockManagerId](blockManagerIds.size - 1) { i => +blockManagerIds((selfIndex + i + 1) % blockManagerIds.size) --- End diff -- or through `filterNot` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17498158 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala --- @@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockIds.map(blockId => getLocations(blockId)) } - private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = { -val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray - -val selfIndex = peers.indexOf(blockManagerId) + /** Get the list of the peers of the given block manager */ + private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { +val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver }.toArray +val selfIndex = blockManagerIds.indexOf(blockManagerId) if (selfIndex == -1) { - throw new SparkException("Self index for " + blockManagerId + " not found") + logError("Self index for " + blockManagerId + " not found") + Seq.empty +} else { + // If the blockManagerIds is [ id1 id2 id3 id4 id5 ] and the blockManagerId is id2 + // Then this code will return the list [ id3 id4 id5 id1 ] + Array.tabulate[BlockManagerId](blockManagerIds.size - 1) { i => +blockManagerIds((selfIndex + i + 1) % blockManagerIds.size) --- End diff -- Again, I had just preserved the existing logic here. However, since I am changing the overall logic from deterministic peer selection (where I want to maintain the order they are returned, so that no two nodes use the same 3rd node as replication target) to random peer selection, I think your suggestion makes sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17498282 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } + + test("get peers with store addition and removal") { --- End diff -- mind calling this BlockManager instead of store? I think most people don't call it store actually --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17498486 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } + + test("get peers with store addition and removal") { +val numStores = 4 +val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } +val storeIds = stores.map { _.blockManagerId }.toSet +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId }) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId }) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId }) + +// Add driver store and test whether it is filtered out +val driverStore = makeBlockManager(1000, "") +assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver)) + +// Add a new store and test whether get peers returns it +val newStore = makeBlockManager(1000, s"store$numStores") +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(newStore.blockManagerId).toSet === storeIds) + +// Remove a store and test whether get peers returns it +val storeIdToRemove = stores(0).blockManagerId +master.removeExecutor(storeIdToRemove.executorId) + assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove)) + +// Test whether asking for peers of a unregistered block manager id returns empty list +assert(master.getPeers(stores(0).blockManagerId).isEmpty) +assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty) + } + + test("block replication - 2x") { +testReplication(2, + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2) +) + } + + test("block replication - 3x") { +// Generate storage levels with 3x replication +val storageLevels = { + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { +level => StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3) + } +} +testReplication(3, storageLevels) + } + + test("block replication - mixed between 1x to 5x") { +// Generate storage levels with varying replication +val storageLevels = Seq( + MEMORY_ONLY, + MEMORY_ONLY_SER_2, + StorageLevel(true, false, false, false, 3), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, true, false, false, 5), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, false, false, false, 3), + MEMORY_ONLY_SER_2, + MEMORY_ONLY +) +testReplication(5, storageLevels) + } + + test("block replication with addition and deletion of executors") { +val blockSize = 1000 +val storeSize = 1 +val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } + +def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { + try { +initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) +assert(master.getLocations(blockId).size === expectedNumLocations) + } finally { +master.removeBlock(blockId) + } --- End diff -- What's the point of this finally? If an assertion fails don't we just move on to the next test? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if th
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17498517 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } + + test("get peers with store addition and removal") { +val numStores = 4 +val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } +val storeIds = stores.map { _.blockManagerId }.toSet +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId }) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId }) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId }) + +// Add driver store and test whether it is filtered out +val driverStore = makeBlockManager(1000, "") +assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver)) + +// Add a new store and test whether get peers returns it +val newStore = makeBlockManager(1000, s"store$numStores") +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(newStore.blockManagerId).toSet === storeIds) + +// Remove a store and test whether get peers returns it +val storeIdToRemove = stores(0).blockManagerId +master.removeExecutor(storeIdToRemove.executorId) + assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove)) + +// Test whether asking for peers of a unregistered block manager id returns empty list +assert(master.getPeers(stores(0).blockManagerId).isEmpty) +assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty) + } + + test("block replication - 2x") { +testReplication(2, + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2) +) + } + + test("block replication - 3x") { +// Generate storage levels with 3x replication +val storageLevels = { + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { +level => StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3) + } +} +testReplication(3, storageLevels) + } + + test("block replication - mixed between 1x to 5x") { +// Generate storage levels with varying replication +val storageLevels = Seq( + MEMORY_ONLY, + MEMORY_ONLY_SER_2, + StorageLevel(true, false, false, false, 3), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, true, false, false, 5), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, false, false, false, 3), + MEMORY_ONLY_SER_2, + MEMORY_ONLY +) +testReplication(5, storageLevels) + } + + test("block replication with addition and deletion of executors") { +val blockSize = 1000 +val storeSize = 1 +val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } + +def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { + try { +initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) +assert(master.getLocations(blockId).size === expectedNumLocations) + } finally { +master.removeBlock(blockId) + } +} + +// 2x replication should work, 3x replication should only replicate 2x +testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2) +testPut("a2", StorageLevel(true, true, false, true, 3), 2) + +// Add another store, 3x replication should work now, 4x replication should
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17498587 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } + + test("get peers with store addition and removal") { +val numStores = 4 +val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } +val storeIds = stores.map { _.blockManagerId }.toSet +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId }) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId }) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId }) + +// Add driver store and test whether it is filtered out +val driverStore = makeBlockManager(1000, "") +assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver)) + +// Add a new store and test whether get peers returns it +val newStore = makeBlockManager(1000, s"store$numStores") +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(newStore.blockManagerId).toSet === storeIds) + +// Remove a store and test whether get peers returns it +val storeIdToRemove = stores(0).blockManagerId +master.removeExecutor(storeIdToRemove.executorId) + assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove)) + +// Test whether asking for peers of a unregistered block manager id returns empty list +assert(master.getPeers(stores(0).blockManagerId).isEmpty) +assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty) + } + + test("block replication - 2x") { +testReplication(2, + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2) +) + } + + test("block replication - 3x") { +// Generate storage levels with 3x replication +val storageLevels = { + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { +level => StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3) + } +} +testReplication(3, storageLevels) + } + + test("block replication - mixed between 1x to 5x") { +// Generate storage levels with varying replication +val storageLevels = Seq( + MEMORY_ONLY, + MEMORY_ONLY_SER_2, + StorageLevel(true, false, false, false, 3), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, true, false, false, 5), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, false, false, false, 3), + MEMORY_ONLY_SER_2, + MEMORY_ONLY +) +testReplication(5, storageLevels) + } + + test("block replication with addition and deletion of executors") { +val blockSize = 1000 +val storeSize = 1 +val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } + +def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { + try { +initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) +assert(master.getLocations(blockId).size === expectedNumLocations) + } finally { +master.removeBlock(blockId) + } +} + +// 2x replication should work, 3x replication should only replicate 2x +testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2) +testPut("a2", StorageLevel(true, true, false, true, 3), 2) + +// Add another store, 3x replication should work now, 4x replication should
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17498639 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } + + test("get peers with store addition and removal") { +val numStores = 4 +val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } +val storeIds = stores.map { _.blockManagerId }.toSet +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId }) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId }) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId }) + +// Add driver store and test whether it is filtered out +val driverStore = makeBlockManager(1000, "") +assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver)) + +// Add a new store and test whether get peers returns it +val newStore = makeBlockManager(1000, s"store$numStores") +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(newStore.blockManagerId).toSet === storeIds) + +// Remove a store and test whether get peers returns it +val storeIdToRemove = stores(0).blockManagerId +master.removeExecutor(storeIdToRemove.executorId) + assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove)) + +// Test whether asking for peers of a unregistered block manager id returns empty list +assert(master.getPeers(stores(0).blockManagerId).isEmpty) +assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty) + } + + test("block replication - 2x") { +testReplication(2, + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2) +) + } + + test("block replication - 3x") { +// Generate storage levels with 3x replication +val storageLevels = { + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { +level => StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3) + } +} +testReplication(3, storageLevels) + } + + test("block replication - mixed between 1x to 5x") { +// Generate storage levels with varying replication +val storageLevels = Seq( + MEMORY_ONLY, + MEMORY_ONLY_SER_2, + StorageLevel(true, false, false, false, 3), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, true, false, false, 5), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, false, false, false, 3), + MEMORY_ONLY_SER_2, + MEMORY_ONLY +) +testReplication(5, storageLevels) + } + + test("block replication with addition and deletion of executors") { +val blockSize = 1000 +val storeSize = 1 +val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } + +def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { + try { +initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) +assert(master.getLocations(blockId).size === expectedNumLocations) + } finally { +master.removeBlock(blockId) + } +} + +// 2x replication should work, 3x replication should only replicate 2x +testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2) +testPut("a2", StorageLevel(true, true, false, true, 3), 2) + +// Add another store, 3x replication should work now, 4x replication should
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-55451730 I looked at the logic in `BlockManager` in detail and it looks reasonable. I left a few minor comments all over the place, but in general this LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-55472830 What happens when there is recomputation which results in same blockId getting regenerated (unpersist followed by recomputation/persist or block drop followed by recomputation or something else ) ? It will now go to some random node potentially not same as previously selected ? Resulting in over-replication ? A more corner case is if the computation was not idempotent ... and resulted in a changed dataset for the block - earlier it will get overwritten as part of replication : will we will now have two nodes with same data and a third (initially replicated to) which can diverge ? Btw, from what I saw, node loss is not handled right ? So a block can get under replicated ? Would be nice if we added that in some day ... Streaming is not the only application for replication :-) We use it in conjunction with locality wait levels to speed up computation when speculative execution is enabled. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56293506 @mridulm Very good thoughts! I totally agree that replication is not only for streaming, and the implications of this patch in other scenarios is important to understand. Let me address them one by one. *1. Over-replication: * I can see that there is a chance that some sequence of events can cause a block to replicated to another node even if the node. But, I am trying to construct a scenario which would lead to over-replication with this patch BUT does not lead to over replication in current Spark. That is, I want to make sure that there is no regression in the behavior. Note that choosing nodes randomly can lead to more uniform memory usage in the cluster. So there is definitely an advantage to this patch. However to preserve previous behavior we can make the randomization deterministic to the block id. So if a block is recomputed on the same node where it had existed, it will get replicated to the same replication target as before. For each block, this is no different from the current behavior, hence should not create a regression in terms of the chances of over-replication. How does that sound? *2. Non-idempotent computations: * In case of such operations, there are other issues in the existing Spark as well. In the current block manager's put behavior, if a block exists locally, it is not readd as it is assumed that block contents are identical. As a result, with current Spark you can potentially have two nodes with two different versions of the same block. One can argue that this patch can increase the chances. The fix I proposed above should take care of it; the probability of that happening wont be different. *3. Under-replication:* Yes, node loss was not and is still not handled and is outside the scope of this patch. Sure would be nice to add it some day. But that definitely leads to further complexity in the BlockManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56293724 @tdas handling (1) deterministically will make (2) in line with what we currently have. And that should be sufficient imo. (3) was not in context of this patch - but a general shortcoming of spark currently. Alleviating (3) might be complicated (not sure how much so) - but will have some very interesting consequences to performance (among others). For example: this prevents us from using block persistance for checkpoint - there was a discussion about this in a JIRA a while back (forgot id) ... resolving this and with 3x replicated blocks, will mean we get really cheap and very performent checkpoint (while having fault tolerance at par with hdfs) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17825042 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala --- @@ -403,16 +402,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockIds.map(blockId => getLocations(blockId)) } - private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = { -val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray - -val selfIndex = peers.indexOf(blockManagerId) + /** Get the list of the peers of the given block manager */ --- End diff -- "peers" automatically means that it does not include the self. It should be obvious. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17825062 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } + + test("get peers with store addition and removal") { --- End diff -- Changed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17825064 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -62,6 +63,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter val securityMgr = new SecurityManager(conf) val mapOutputTracker = new MapOutputTrackerMaster(conf) val shuffleManager = new HashShuffleManager(conf) + val allStores = new ArrayBuffer[BlockManager] --- End diff -- Added. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17825538 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } + + test("get peers with store addition and removal") { +val numStores = 4 +val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } +val storeIds = stores.map { _.blockManagerId }.toSet +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId }) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId }) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId }) + +// Add driver store and test whether it is filtered out +val driverStore = makeBlockManager(1000, "") +assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver)) + +// Add a new store and test whether get peers returns it +val newStore = makeBlockManager(1000, s"store$numStores") +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(newStore.blockManagerId).toSet === storeIds) + +// Remove a store and test whether get peers returns it +val storeIdToRemove = stores(0).blockManagerId +master.removeExecutor(storeIdToRemove.executorId) + assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove)) + +// Test whether asking for peers of a unregistered block manager id returns empty list +assert(master.getPeers(stores(0).blockManagerId).isEmpty) +assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty) + } + + test("block replication - 2x") { +testReplication(2, + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2) +) + } + + test("block replication - 3x") { +// Generate storage levels with 3x replication +val storageLevels = { + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { +level => StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3) + } +} +testReplication(3, storageLevels) + } + + test("block replication - mixed between 1x to 5x") { +// Generate storage levels with varying replication +val storageLevels = Seq( + MEMORY_ONLY, + MEMORY_ONLY_SER_2, + StorageLevel(true, false, false, false, 3), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, true, false, false, 5), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, false, false, false, 3), + MEMORY_ONLY_SER_2, + MEMORY_ONLY +) +testReplication(5, storageLevels) + } + + test("block replication with addition and deletion of executors") { +val blockSize = 1000 +val storeSize = 1 +val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } + +def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { + try { +initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) +assert(master.getLocations(blockId).size === expectedNumLocations) + } finally { +master.removeBlock(blockId) + } --- End diff -- This is confusing. The reason there is a `try...finally` is that this function `testPut` can be tried over and over again, inside a `eventually` block. So if the assertion fails, the system does not move on to the next unit test. The system needs to be reset, by removing the block that w
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17825562 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } + + test("get peers with store addition and removal") { +val numStores = 4 +val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } +val storeIds = stores.map { _.blockManagerId }.toSet +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId }) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId }) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId }) + +// Add driver store and test whether it is filtered out +val driverStore = makeBlockManager(1000, "") +assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver)) + +// Add a new store and test whether get peers returns it +val newStore = makeBlockManager(1000, s"store$numStores") +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(newStore.blockManagerId).toSet === storeIds) + +// Remove a store and test whether get peers returns it +val storeIdToRemove = stores(0).blockManagerId +master.removeExecutor(storeIdToRemove.executorId) + assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove)) + +// Test whether asking for peers of a unregistered block manager id returns empty list +assert(master.getPeers(stores(0).blockManagerId).isEmpty) +assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty) + } + + test("block replication - 2x") { +testReplication(2, + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2) +) + } + + test("block replication - 3x") { +// Generate storage levels with 3x replication +val storageLevels = { + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { +level => StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3) + } +} +testReplication(3, storageLevels) + } + + test("block replication - mixed between 1x to 5x") { +// Generate storage levels with varying replication +val storageLevels = Seq( + MEMORY_ONLY, + MEMORY_ONLY_SER_2, + StorageLevel(true, false, false, false, 3), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, true, false, false, 5), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, false, false, false, 3), + MEMORY_ONLY_SER_2, + MEMORY_ONLY +) +testReplication(5, storageLevels) + } + + test("block replication with addition and deletion of executors") { +val blockSize = 1000 +val storeSize = 1 +val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } + +def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { + try { +initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) +assert(master.getLocations(blockId).size === expectedNumLocations) + } finally { +master.removeBlock(blockId) + } +} + +// 2x replication should work, 3x replication should only replicate 2x +testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2) +testPut("a2", StorageLevel(true, true, false, true, 3), 2) + +// Add another store, 3x replication should work now, 4x replication should only
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17825572 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } + + test("get peers with store addition and removal") { +val numStores = 4 +val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } +val storeIds = stores.map { _.blockManagerId }.toSet +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId }) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId }) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId }) + +// Add driver store and test whether it is filtered out +val driverStore = makeBlockManager(1000, "") +assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver)) +assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver)) + +// Add a new store and test whether get peers returns it +val newStore = makeBlockManager(1000, s"store$numStores") +assert(master.getPeers(stores(0).blockManagerId).toSet === + storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(1).blockManagerId).toSet === + storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(stores(2).blockManagerId).toSet === + storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId) +assert(master.getPeers(newStore.blockManagerId).toSet === storeIds) + +// Remove a store and test whether get peers returns it +val storeIdToRemove = stores(0).blockManagerId +master.removeExecutor(storeIdToRemove.executorId) + assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove)) + assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove)) + +// Test whether asking for peers of a unregistered block manager id returns empty list +assert(master.getPeers(stores(0).blockManagerId).isEmpty) +assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty) + } + + test("block replication - 2x") { +testReplication(2, + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2) +) + } + + test("block replication - 3x") { +// Generate storage levels with 3x replication +val storageLevels = { + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { +level => StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3) + } +} +testReplication(3, storageLevels) + } + + test("block replication - mixed between 1x to 5x") { +// Generate storage levels with varying replication +val storageLevels = Seq( + MEMORY_ONLY, + MEMORY_ONLY_SER_2, + StorageLevel(true, false, false, false, 3), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, true, false, false, 5), + StorageLevel(true, true, false, true, 4), + StorageLevel(true, false, false, false, 3), + MEMORY_ONLY_SER_2, + MEMORY_ONLY +) +testReplication(5, storageLevels) + } + + test("block replication with addition and deletion of executors") { +val blockSize = 1000 +val storeSize = 1 +val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } + +def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { + try { +initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) +assert(master.getLocations(blockId).size === expectedNumLocations) + } finally { +master.removeBlock(blockId) + } +} + +// 2x replication should work, 3x replication should only replicate 2x +testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2) +testPut("a2", StorageLevel(true, true, false, true, 3), 2) + +// Add another store, 3x replication should work now, 4x replication should only
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56311396 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20634/consoleFull) for PR 2366 at commit [`3821ab9`](https://github.com/apache/spark/commit/3821ab971bcc85b182288f9039bf38da0acedece). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56313463 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20634/consoleFull) for PR 2366 at commit [`3821ab9`](https://github.com/apache/spark/commit/3821ab971bcc85b182288f9039bf38da0acedece). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56317365 @mridulm I implemented (1) and also added an unit test for testing that behavior. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56317539 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20636/consoleFull) for PR 2366 at commit [`08afaa9`](https://github.com/apache/spark/commit/08afaa94e0672ae60bee6737c040ec0d9de9d268). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17829791 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { --- End diff -- can you rename this updatePeersFromMaster? the current name seems to suggest it is a really cheap getter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56319636 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20636/consoleFull) for PR 2366 at commit [`08afaa9`](https://github.com/apache/spark/commit/08afaa94e0672ae60bee6737c040ec0d9de9d268). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17829854 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { --- End diff -- also update to comment to say more, like this is fetching an updated list from the driver. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17829869 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -111,6 +111,8 @@ private[spark] class BlockManager( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) + private val cachedPeers = new HashSet[BlockManagerId] --- End diff -- when do u ever remove entries from here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17829908 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -111,6 +111,8 @@ private[spark] class BlockManager( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) + private val cachedPeers = new HashSet[BlockManagerId] --- End diff -- ah ic you clear the hashset down there ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17829919 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { --- End diff -- you should also do the timeout check in the synchronized block - because otherwise two racing requests will immediately send two requests to the driver, and the requests are kind of expensive. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17829945 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { --- End diff -- actually -- never mind my renaming comment, since this uses the cache. you should still update the comment to state what this does though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17829956 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None --- End diff -- the toSeq seems expensive? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17829979 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None } -for (peer: BlockManagerId <- cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + -s"To node: $peer") - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception => - logError(s"Failed to replicate block to $peer", e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +while (!done) { --- End diff -- can we get rid of done and just use ```scala while (peersReplicatedTo.size < numPeersToReplicateTo && failures <= maxReplicationFailures) { ``` otherwise we have to track the place where done is updated to find out when it is done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17830017 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None } -for (peer: BlockManagerId <- cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + -s"To node: $peer") - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception => - logError(s"Failed to replicate block to $peer", e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +while (!done) { --- End diff -- ah it's trickier to handle the None case. Ok in that case let's keep the done, but do comment explicitly on the three conditions that this will terminate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17830025 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime --- End diff -- why nano instead of current milli? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17831655 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { --- End diff -- `updatePeersFromMaster` is a misnomer as it does not actually query master until the parameter is true or the TTL has expired (which I have increased to 60 seconds). MOST of the time it is a cheap operation. So I am not sure whats best here. I am totally open to more suggestions. How about keeping it `getPeers` with param name as `forceUpdateFromMaster`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17833363 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None --- End diff -- This would mean that, assuming there is no change to number of executors in system, we will consistently get the same peer back. But even if some unrelated peer was added or removed, we wont get back the same peer we cached to last time. Did I read the correctly, or am I missing something ? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17833383 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None } -for (peer: BlockManagerId <- cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + -s"To node: $peer") - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception => - logError(s"Failed to replicate block to $peer", e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +while (!done) { + getRandomPeer() match { +case Some(peer) => + try { +val onePeerStartTime = System.nanoTime +data.rewind() +logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") +blockTransferService.uploadBlockSync( + peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) +logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms" + .format((System.nanoTime - onePeerStartTime) / 1e6)) +peersReplicatedTo += peer +forceFetchPeers = false +if (peersReplicatedTo.size == numPeersToReplicateTo) { + done = true +} + } catch { +case e: Exception => + logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e) + failures += 1 + forceFetchPeers = true + peersFailedToReplicateTo += peer --- End diff -- Ideally, we might want to cache this peersFailedToReplicateTo across block updates for a short ttl (to temporarily blacklist replication to peer). But that can be done in a future PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17833419 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None } -for (peer: BlockManagerId <- cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + -s"To node: $peer") - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception => - logError(s"Failed to replicate block to $peer", e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +while (!done) { + getRandomPeer() match { +case Some(peer) => + try { +val onePeerStartTime = System.nanoTime +data.rewind() +logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") +blockTransferService.uploadBlockSync( + peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) +logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms" + .format((System.nanoTime - onePeerStartTime) / 1e6)) +peersReplicatedTo += peer +forceFetchPeers = false +if (peersReplicatedTo.size == numPeersToReplicateTo) { + done = true +} + } catch { +case e: Exception => + logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e) + failures += 1 + forceFetchPeers = true + peersFailedToReplicateTo += peer --- End diff -- Btw, curious - will replication fail only when remote peer is dead ? (and so requiring forceFetchPeers) What about inability to add block in remote peer ? Will that cause an exception to be raised here ? Eseentially I am trying to understand if Exception raised here always means remote peer is 'dead'. Alternative might be to list peers which have atleast data.rewrind().remaining() space available : but we dont support that iirc (and it can get used up before we make this call anyway I guess). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled a
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17833483 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None } -for (peer: BlockManagerId <- cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + -s"To node: $peer") - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception => - logError(s"Failed to replicate block to $peer", e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +while (!done) { + getRandomPeer() match { +case Some(peer) => + try { +val onePeerStartTime = System.nanoTime +data.rewind() +logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") +blockTransferService.uploadBlockSync( + peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) +logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms" + .format((System.nanoTime - onePeerStartTime) / 1e6)) +peersReplicatedTo += peer +forceFetchPeers = false +if (peersReplicatedTo.size == numPeersToReplicateTo) { + done = true +} + } catch { +case e: Exception => + logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e) + failures += 1 + forceFetchPeers = true + peersFailedToReplicateTo += peer + if (failures > maxReplicationFailures) { +done = true + } + } +case None => + // no peer left to replicate to + done = true --- End diff -- What if initial list had only self in executor list and we are within TTL (and so getPeers returns empty list) - bootstrapping time for example. Do we want to check if server has updates for us ? This will kind of hose our ttl though ... but maybe corner case. Or is this handled already ? Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17902630 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { --- End diff -- Good catch! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17904273 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None --- End diff -- @mridulm You read that correctly, but @rxin point is different. He is wondering whether converting the `HashSet` to a `Seq` (so that we can select one by index) every time we select a peer could be expensive if the size of the peers is large (say, in a cluster with 1000s of nodes?). I am not entirely convinced that even if the list of nodes is O(1000) the computation is going to be expensive. But I am still going to make an attempt to make this more efficient. In fact, repeated calculation of `peers` in line 828 every time a peer needs to be selected, can also be avoided. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17906928 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None } -for (peer: BlockManagerId <- cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + -s"To node: $peer") - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception => - logError(s"Failed to replicate block to $peer", e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +while (!done) { --- End diff -- I added comments before the while, as well as at all the 3 places where `done` is marked as `true`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17907153 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime --- End diff -- Because existing code did so. Changing to milli. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17916865 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None } -for (peer: BlockManagerId <- cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + -s"To node: $peer") - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception => - logError(s"Failed to replicate block to $peer", e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +while (!done) { + getRandomPeer() match { +case Some(peer) => + try { +val onePeerStartTime = System.nanoTime +data.rewind() +logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") +blockTransferService.uploadBlockSync( + peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) +logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms" + .format((System.nanoTime - onePeerStartTime) / 1e6)) +peersReplicatedTo += peer +forceFetchPeers = false +if (peersReplicatedTo.size == numPeersToReplicateTo) { + done = true +} + } catch { +case e: Exception => + logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e) + failures += 1 + forceFetchPeers = true + peersFailedToReplicateTo += peer --- End diff -- I agree that there may be other reasons for failure to send to a remote node. Even in those cases, the current behavior of re-fetching the peer list and sending to another node, is correct. Just not the most efficient. This optimization is something that can be addressed in a future PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17921361 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None } -for (peer: BlockManagerId <- cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " + -s"To node: $peer") - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception => - logError(s"Failed to replicate block to $peer", e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +while (!done) { + getRandomPeer() match { +case Some(peer) => + try { +val onePeerStartTime = System.nanoTime +data.rewind() +logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") +blockTransferService.uploadBlockSync( + peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) +logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms" + .format((System.nanoTime - onePeerStartTime) / 1e6)) +peersReplicatedTo += peer +forceFetchPeers = false +if (peersReplicatedTo.size == numPeersToReplicateTo) { + done = true +} + } catch { +case e: Exception => + logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e) + failures += 1 + forceFetchPeers = true + peersFailedToReplicateTo += peer + if (failures > maxReplicationFailures) { +done = true + } + } +case None => + // no peer left to replicate to + done = true --- End diff -- The initial list wont ever have self as the BlockManagerMaster returns list of nodes excluding the id of the node requesting it (that is, self). Nonetheless getPeers can return empty list (e.g., local mode, with only one BlockManager). And can also happen in the bootstrapping time. However, current Spark already suffers from this problem. In fact its much worse. Currently, the peer list is only fetched once from master ( upon first time replication)
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56553233 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20711/consoleFull) for PR 2366 at commit [`89f91a0`](https://github.com/apache/spark/commit/89f91a0109bd7d8b988bcaf800170bd82e1678f7). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17924427 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { --- End diff -- Ah yes, good point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56561115 @rxin, @mridulm I have slightly changed the logic to select one peer at random to make it more efficient. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56563234 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20711/consoleFull) for PR 2366 at commit [`89f91a0`](https://github.com/apache/spark/commit/89f91a0109bd7d8b988bcaf800170bd82e1678f7). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster` * `logInfo("Interrupting user class to stop.")` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56563250 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20711/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17927862 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +790,111 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): mutable.HashSet[BlockManagerId] = { +cachedPeers.synchronized { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } --- End diff -- There is an MT bug here. Since cachedPeers is updated in place, it is possible for 'previous' invocation to be using cachedPeers while the next invocation is clearing/updating it. We can avoid that by overwriting cachedPeers instance variable with result of master.getPeers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56566367 @tdas In case I did not mention it before :-) this is definitely a great improvement over what existed earlier ! I would love it if we could (sometime soon I hope) add support for re-replication of blocks due to lost executors : which, currently, is outside scope of this PR it seems. Other than the MT bug I mentioned above, this looks good to me ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56573235 If this is getting more complicated, we should consider standardizing the internal api and then buliding a separate service that properly handles all these issues. That service can also handle serving shuffle blocks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18117922 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +790,111 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): mutable.HashSet[BlockManagerId] = { +cachedPeers.synchronized { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } --- End diff -- Good point. Then we would need a separate locking object for synchronizing this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57032133 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20888/consoleFull) for PR 2366 at commit [`012afa3`](https://github.com/apache/spark/commit/012afa32f0f009e43eb6da28036087cc5264a7b3). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57035730 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20888/consoleFull) for PR 2366 at commit [`012afa3`](https://github.com/apache/spark/commit/012afa32f0f009e43eb6da28036087cc5264a7b3). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57035733 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20888/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57042546 @rxin I dont think it is getting more complicated that the status quo. The complexity of fetching and caching of peers is contained in this one method `getPeers`, so it should be reasonably self contained and easy to replace with a different implementation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-57042584 No I was responding to Mridul's rebalancing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18122027 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds --- End diff -- this is a constant - so why not just put it outside of this function? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18122030 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -111,6 +112,9 @@ private[spark] class BlockManager( MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf) private val broadcastCleaner = new MetadataCleaner( MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf) + @volatile private var cachedPeers: Seq[BlockManagerId] = _ --- End diff -- add a blank line so we have some logical separation. even better if you can add some inline comment ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18122031 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds --- End diff -- actually probably no big deal to leave this here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18122034 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +791,110 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { +peerFetchLock.synchronized { + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + if (cachedPeers == null || forceFetch || timeout) { +cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } + cachedPeers +} + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersForReplication = new ArrayBuffer[BlockManagerId] --- End diff -- not that big of a deal, but maybe you can reduce the initial size of the array buffer to 3 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r18122036 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime --- End diff -- i guess this hasn't happened yet? :p --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org