Hey Imran, 

 Thanks for the great explanation! This cleared up a lot of things for me. I am 
actually trying to utilize some of the features within Spark for a system I am 
developing. I am currently working on developing a subsystem that can be 
integrated within Spark and other Big Data solutions. In order to integrate it 
within Spark, I am trying to utilize the rdds and functions provided to the 
reduce method on my system. My system is developed in Scala and Java. In Spark, 
I have seen that the function provided to the reduce method, along with the 
RDD, gets serialized and sent to the worker nodes. The worker nodes are able to 
deserialize them and then execute the task on them. I see this happening in 
ResultTask.scala. When I try to do something similar, I get exceptions. The 
system I am developing has Spark jars in its build path, so it is able to 
create a SparkContext etc. 

When I do, 

val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() (similar 
to DAGScheduler.scala)
val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Int)](
      ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader)
println(func2(context, rdd2.iterator(rdd2.partitions(1), context)));

I get the proper result and can print it out. 

But when I involve the network by serializing the data, using the network to 
send it to a different program, then deserialize the data and use the function, 
I get the following error:

Exception in thread "main" java.lang.NullPointerException
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
        at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31)
        at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30)
        at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
        at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
        at SimpleApp$.net(SimpleApp.scala:71)
        at SimpleApp$.main(SimpleApp.scala:76)
        at SimpleApp.main(SimpleApp.scala)

I have also made sure that I am adding the class file of the program that is 
sending the serialized data to the bin folder of the program that is receiving 
the data. I’m not sure what I am doing wrong. I’ve done the serialization and 
creation of the function similar to how Spark does it. I created another reduce 
function like this. When implemented this way, it prints out the result of 
func2 properly. But when I involve the network by sending the serialized data 
to another program, I get the above exception. 

   def reduceMod(f: (Integer, Integer) => Integer): Integer = {
    val reducePartition: Iterator[Integer] => Option[Integer] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(f))
      } else {
        None
      }
    }
    val processFunc = (context: TaskContext, iter: Iterator[Integer]) => 
reducePartition(iter)
    val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) => Int]
    context = new TaskContextImpl(stageId = 1, partitionId = 1,
      taskAttemptId = 1, attemptNumber = 1, runningLocally = false)
    println(func.getClass.getName);
    println(func(context, rdd.iterator(rdd.partitions(1), context)));
    val bb = closureSerializer.serialize((rdd, func) : AnyRef).array()
    val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Int)](
      ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader)
    println(func2(context, rdd3.iterator(rdd3.partitions(1), context)));
    1
  }
 
I was wondering if you had any ideas on what I am doing wrong, or how I can 
properly send the serialized version of the RDD and function to my other 
program. My thought is that I might need to add more jars to the build path, 
but I have no clue if thats the issue and what jars I need to add. 

Thanks,
Raghav

> On Apr 13, 2015, at 10:22 PM, Imran Rashid <iras...@cloudera.com> wrote:
> 
> On the worker side, it all happens in Executor.  The task result is computed 
> here:
> 
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210
>  
> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210>
> 
> then its serialized along with some other goodies, and finally sent back to 
> the driver here:
> 
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255
>  
> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255>
> 
> What happens on the driver is quite a bit more complicated, and involves a 
> number of spots in the code, but at least to get you started, the results are 
> received here:
> 
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328
>  
> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328>
> 
> though perhaps a more interesting spot is where they are handled in 
> DagScheduler#handleTaskCompletion:
> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001
>  
> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001>
> 
> 
> also, I think I know what you mean, but just to make sure: I wouldn't say the 
> results from the worker are "broadcast" back to the driver.  (a) in spark, 
> "broadcast" tends to refer to a particular api for sharing immutable data 
> from the driver to the workers (only one direction) and (b) it doesn't really 
> fit a more general meaning of "broadcast" either, since the results are sent 
> only to the driver, not to all nodes.
> 
> On Sun, Mar 29, 2015 at 8:34 PM, raggy <raghav0110...@gmail.com 
> <mailto:raghav0110...@gmail.com>> wrote:
> I am a PhD student working on a research project related to Apache Spark. I
> am trying to modify some of the spark source code such that instead of
> sending the final result RDD from the worker nodes to a master node, I want
> to send the final result RDDs to some different node. In order to do this, I
> have been trying to identify at which point the Spark worker nodes broadcast
> the results of a job back to the master.
> 
> So far, I understand that in Spark, the master serializes the RDD and the
> functions to be applied on them and sends them over to the worker nodes. In
> the context of reduce, it serializes the RDD partition and the reduce
> function and sends them to the worker nodes. However, my understanding of
> how things happen at the worker node is very limited and I would appreciate
> it if someone could help me identify where the process of broadcasting the
> results of local worker computations back to the master node takes place.
> 
> This is some of the limited knowledge that I have about the worker nodes:
> 
> Each job gets divided into smaller sets of tasks called stages. Each Stage
> is either a Shuffle Map Stage or Result Stage. In a Shuffle Map Stage, the
> task results are used as input for another stage. The result stage uses the
> RDD to compute the action that initiated the job. So, this result stage
> executes the last task for the job on the worker node. I would assume after
> this is done, it gets the result and broadcasts it to the driver
> application(the master).
> 
> In ResultTask.scala(spark-core src/main/scala org.apache.spark.scheduler) it
> states "A task that sends back the output to the driver application.".
> However, I don't see when or where this happens in the source code. I would
> very much appreciate it if someone could help me identify where this happens
> in the Spark source code.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html
>  
> <http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 

Reply via email to