Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2366#discussion_r17498486
  
    --- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1228,4 +1240,212 @@ class BlockManagerSuite extends FunSuite with 
Matchers with BeforeAndAfter
         assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
         assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
       }
    +
    +  test("get peers with store addition and removal") {
    +    val numStores = 4
    +    val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, 
s"store$i") }
    +    val storeIds = stores.map { _.blockManagerId }.toSet
    +    assert(master.getPeers(stores(0).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(0).blockManagerId })
    +    assert(master.getPeers(stores(1).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(1).blockManagerId })
    +    assert(master.getPeers(stores(2).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(2).blockManagerId })
    +
    +    // Add driver store and test whether it is filtered out
    +    val driverStore = makeBlockManager(1000, "<driver>")
    +    assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
    +    assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
    +    assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
    +
    +    // Add a new store and test whether get peers returns it
    +    val newStore = makeBlockManager(1000, s"store$numStores")
    +    assert(master.getPeers(stores(0).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(0).blockManagerId } + 
newStore.blockManagerId)
    +    assert(master.getPeers(stores(1).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(1).blockManagerId } + 
newStore.blockManagerId)
    +    assert(master.getPeers(stores(2).blockManagerId).toSet ===
    +      storeIds.filterNot { _ == stores(2).blockManagerId } + 
newStore.blockManagerId)
    +    assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
    +
    +    // Remove a store and test whether get peers returns it
    +    val storeIdToRemove = stores(0).blockManagerId
    +    master.removeExecutor(storeIdToRemove.executorId)
    +    
assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
    +    
assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
    +    
assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
    +
    +    // Test whether asking for peers of a unregistered block manager id 
returns empty list
    +    assert(master.getPeers(stores(0).blockManagerId).isEmpty)
    +    assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
    +  }
    +
    +  test("block replication - 2x") {
    +    testReplication(2,
    +      Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, 
MEMORY_AND_DISK_SER_2)
    +    )
    +  }
    +
    +  test("block replication - 3x") {
    +    // Generate storage levels with 3x replication
    +    val storageLevels = {
    +      Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, 
MEMORY_AND_DISK_SER).map {
    +        level => StorageLevel(
    +          level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 3)
    +      }
    +    }
    +    testReplication(3, storageLevels)
    +  }
    +
    +  test("block replication - mixed between 1x to 5x") {
    +    // Generate storage levels with varying replication
    +    val storageLevels = Seq(
    +      MEMORY_ONLY,
    +      MEMORY_ONLY_SER_2,
    +      StorageLevel(true, false, false, false, 3),
    +      StorageLevel(true, true, false, true, 4),
    +      StorageLevel(true, true, false, false, 5),
    +      StorageLevel(true, true, false, true, 4),
    +      StorageLevel(true, false, false, false, 3),
    +      MEMORY_ONLY_SER_2,
    +      MEMORY_ONLY
    +    )
    +    testReplication(5, storageLevels)
    +  }
    +
    +  test("block replication with addition and deletion of executors") {
    +    val blockSize = 1000
    +    val storeSize = 10000
    +    val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, 
s"store$i") }
    +
    +    def testPut(blockId: String, storageLevel: StorageLevel, 
expectedNumLocations: Int) {
    +      try {
    +        initialStores(0).putSingle(blockId, new Array[Byte](blockSize), 
storageLevel)
    +        assert(master.getLocations(blockId).size === expectedNumLocations)
    +      } finally {
    +        master.removeBlock(blockId)
    +      }
    --- End diff --
    
    What's the point of this finally? If an assertion fails don't we just move 
on to the next test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to