Hi,
I am trying to understand how the
/spark/*/Storage/BlockManagerMaster.askDriverWithReply() works.
def getPeers(blockManagerId: BlockManagerId, numPeers: Int):
Seq[BlockManagerId] = {
val result =
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
if (result.length != numPeers) {
throw new SparkException(
"Error getting peers, only got " + result.size + " instead of " + numPeers)
}
result
}
Here, getPeers calls askDriverWithReply().
private def askDriverWithReply[T](message: Any): T = {
// TODO: Consider removing multiple attempts
if (driverActor == null) {
throw new SparkException("Error sending message to BlockManager as
driverActor is null " +
"[message = " + message + "]")
}
var attempts = 0
var lastException: Exception = null
while (attempts < AKKA_RETRY_ATTEMPTS) {
attempts += 1
try {
val future = driverActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
throw new SparkException("BlockManagerMaster returned null")
}
return result.asInstanceOf[T]
} catch {
case ie: InterruptedException => throw ie
case e: Exception =>
lastException = e
logWarning("Error sending message to BlockManagerMaster in " + attempts + "
attempts", e)
}
Thread.sleep(AKKA_RETRY_INTERVAL_MS)
}
throw new SparkException("Error sending message to BlockManagerMaster
[message = " + message + "]", lastException)
}
Here, getPeers method calls askDriverWithReply() with message "GetPeers()".
The Driver returns the BlockManagerId's.
val future = driverActor.ask(message)(timeout)
val result = Await.result(future, timeout)
Here, we obtain "result". But, I couldn't find definition of ask() that
processes message GetPeers(). Can someone please tell me how/where the
'result' is being constructed??
Thank you!!
Karthik