Re: question on replicate() in blockManager.scala

2014-09-06 Thread Aaron Davidson
Looks like that's BlockManagerWorker.syncPutBlock(), which is in an if
check, perhaps obscuring its existence.


On Fri, Sep 5, 2014 at 2:19 AM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Hi,

 var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: String, data: ByteBuffer, level:
 StorageLevel) {
 val tLevel = StorageLevel(level.useDisk, level.useMemory,
 level.deserialized, 1)
 if (cachedPeers == null) {
   cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
 }
 for (peer: BlockManagerId - cachedPeers) {
   val start = System.nanoTime
   data.rewind()
   logDebug(Try to replicate BlockId  + blockId +  once; The size of
 the data is 
 + data.limit() +  Bytes. To node:  + peer)
   if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data,
 tLevel),
 new ConnectionManagerId(peer.host, peer.port))) {
 logError(Failed to call syncPutBlock to  + peer)
   }
   logDebug(Replicated BlockId  + blockId +  once used  +
 (System.nanoTime - start) / 1e6 +  s; The size of the data is  +
 data.limit() +  bytes.)
 }


 I get the flow of this code. But, I dont find any method being called for
 actually writing the data into the set of peers chosen for replication.

 Where exaclty is the replication happening?

 Thank you!!
 -Karthik



question on replicate() in blockManager.scala

2014-09-05 Thread rapelly kartheek
Hi,

var cachedPeers: Seq[BlockManagerId] = null
  private def replicate(blockId: String, data: ByteBuffer, level:
StorageLevel) {
val tLevel = StorageLevel(level.useDisk, level.useMemory,
level.deserialized, 1)
if (cachedPeers == null) {
  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
}
for (peer: BlockManagerId - cachedPeers) {
  val start = System.nanoTime
  data.rewind()
  logDebug(Try to replicate BlockId  + blockId +  once; The size of
the data is 
+ data.limit() +  Bytes. To node:  + peer)
  if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel),
new ConnectionManagerId(peer.host, peer.port))) {
logError(Failed to call syncPutBlock to  + peer)
  }
  logDebug(Replicated BlockId  + blockId +  once used  +
(System.nanoTime - start) / 1e6 +  s; The size of the data is  +
data.limit() +  bytes.)
}


I get the flow of this code. But, I dont find any method being called for
actually writing the data into the set of peers chosen for replication.

Where exaclty is the replication happening?

Thank you!!
-Karthik