Hi, var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) { val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) if (cachedPeers == null) { cachedPeers = master.getPeers(blockManagerId, level.replication - 1) } for (peer: BlockManagerId <- cachedPeers) { val start = System.nanoTime data.rewind() logDebug("Try to replicate BlockId " + blockId + " once; The size of the data is " + data.limit() + " Bytes. To node: " + peer) if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel), new ConnectionManagerId(peer.host, peer.port))) { logError("Failed to call syncPutBlock to " + peer) } logDebug("Replicated BlockId " + blockId + " once used " + (System.nanoTime - start) / 1e6 + " s; The size of the data is " + data.limit() + " bytes.") }
I get the flow of this code. But, I dont find any method being called for actually writing the data into the set of peers chosen for replication. Where exaclty is the replication happening? Thank you!! -Karthik