Looks like that's BlockManagerWorker.syncPutBlock(), which is in an if
check, perhaps obscuring its existence.
On Fri, Sep 5, 2014 at 2:19 AM, rapelly kartheek kartheek.m...@gmail.com
wrote:
Hi,
var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: String, data: ByteBuffer, level:
StorageLevel) {
val tLevel = StorageLevel(level.useDisk, level.useMemory,
level.deserialized, 1)
if (cachedPeers == null) {
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
}
for (peer: BlockManagerId - cachedPeers) {
val start = System.nanoTime
data.rewind()
logDebug(Try to replicate BlockId + blockId + once; The size of
the data is
+ data.limit() + Bytes. To node: + peer)
if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data,
tLevel),
new ConnectionManagerId(peer.host, peer.port))) {
logError(Failed to call syncPutBlock to + peer)
}
logDebug(Replicated BlockId + blockId + once used +
(System.nanoTime - start) / 1e6 + s; The size of the data is +
data.limit() + bytes.)
}
I get the flow of this code. But, I dont find any method being called for
actually writing the data into the set of peers chosen for replication.
Where exaclty is the replication happening?
Thank you!!
-Karthik