My apologies, I had pasted the wrong exception trace in the previous email. 
Here is the actual exception that I am receiving. 

Exception in thread "main" java.lang.NullPointerException
        at 
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154)
        at 
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

> On Apr 17, 2015, at 2:30 AM, Raghav Shankar <raghav0110...@gmail.com> wrote:
> 
> Hey Imran, 
> 
>  Thanks for the great explanation! This cleared up a lot of things for me. I 
> am actually trying to utilize some of the features within Spark for a system 
> I am developing. I am currently working on developing a subsystem that can be 
> integrated within Spark and other Big Data solutions. In order to integrate 
> it within Spark, I am trying to utilize the rdds and functions provided to 
> the reduce method on my system. My system is developed in Scala and Java. In 
> Spark, I have seen that the function provided to the reduce method, along 
> with the RDD, gets serialized and sent to the worker nodes. The worker nodes 
> are able to deserialize them and then execute the task on them. I see this 
> happening in ResultTask.scala. When I try to do something similar, I get 
> exceptions. The system I am developing has Spark jars in its build path, so 
> it is able to create a SparkContext etc. 
> 
> When I do, 
> 
> val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() 
> (similar to DAGScheduler.scala)
> val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, 
> Iterator[Int]) => Int)](
>       ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader)
> println(func2(context, rdd2.iterator(rdd2.partitions(1), context)));
> 
> I get the proper result and can print it out. 
> 
> But when I involve the network by serializing the data, using the network to 
> send it to a different program, then deserialize the data and use the 
> function, I get the following error:
> 
> Exception in thread "main" java.lang.NullPointerException
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
>       at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31)
>       at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30)
>       at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
>       at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37)
>       at SimpleApp$.net(SimpleApp.scala:71)
>       at SimpleApp$.main(SimpleApp.scala:76)
>       at SimpleApp.main(SimpleApp.scala)
> 
> I have also made sure that I am adding the class file of the program that is 
> sending the serialized data to the bin folder of the program that is 
> receiving the data. I’m not sure what I am doing wrong. I’ve done the 
> serialization and creation of the function similar to how Spark does it. I 
> created another reduce function like this. When implemented this way, it 
> prints out the result of func2 properly. But when I involve the network by 
> sending the serialized data to another program, I get the above exception. 
> 
>    def reduceMod(f: (Integer, Integer) => Integer): Integer = {
>     val reducePartition: Iterator[Integer] => Option[Integer] = iter => {
>       if (iter.hasNext) {
>         Some(iter.reduceLeft(f))
>       } else {
>         None
>       }
>     }
>     val processFunc = (context: TaskContext, iter: Iterator[Integer]) => 
> reducePartition(iter)
>     val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) => Int]
>     context = new TaskContextImpl(stageId = 1, partitionId = 1,
>       taskAttemptId = 1, attemptNumber = 1, runningLocally = false)
>     println(func.getClass.getName);
>     println(func(context, rdd.iterator(rdd.partitions(1), context)));
>     val bb = closureSerializer.serialize((rdd, func) : AnyRef).array()
>     val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], 
> (TaskContext, Iterator[Int]) => Int)](
>       ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader)
>     println(func2(context, rdd3.iterator(rdd3.partitions(1), context)));
>     1
>   }
>  
> I was wondering if you had any ideas on what I am doing wrong, or how I can 
> properly send the serialized version of the RDD and function to my other 
> program. My thought is that I might need to add more jars to the build path, 
> but I have no clue if thats the issue and what jars I need to add. 
> 
> Thanks,
> Raghav
> 
>> On Apr 13, 2015, at 10:22 PM, Imran Rashid <iras...@cloudera.com 
>> <mailto:iras...@cloudera.com>> wrote:
>> 
>> On the worker side, it all happens in Executor.  The task result is computed 
>> here:
>> 
>> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210
>>  
>> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210>
>> 
>> then its serialized along with some other goodies, and finally sent back to 
>> the driver here:
>> 
>> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255
>>  
>> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255>
>> 
>> What happens on the driver is quite a bit more complicated, and involves a 
>> number of spots in the code, but at least to get you started, the results 
>> are received here:
>> 
>> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328
>>  
>> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328>
>> 
>> though perhaps a more interesting spot is where they are handled in 
>> DagScheduler#handleTaskCompletion:
>> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001
>>  
>> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001>
>> 
>> 
>> also, I think I know what you mean, but just to make sure: I wouldn't say 
>> the results from the worker are "broadcast" back to the driver.  (a) in 
>> spark, "broadcast" tends to refer to a particular api for sharing immutable 
>> data from the driver to the workers (only one direction) and (b) it doesn't 
>> really fit a more general meaning of "broadcast" either, since the results 
>> are sent only to the driver, not to all nodes.
>> 
>> On Sun, Mar 29, 2015 at 8:34 PM, raggy <raghav0110...@gmail.com 
>> <mailto:raghav0110...@gmail.com>> wrote:
>> I am a PhD student working on a research project related to Apache Spark. I
>> am trying to modify some of the spark source code such that instead of
>> sending the final result RDD from the worker nodes to a master node, I want
>> to send the final result RDDs to some different node. In order to do this, I
>> have been trying to identify at which point the Spark worker nodes broadcast
>> the results of a job back to the master.
>> 
>> So far, I understand that in Spark, the master serializes the RDD and the
>> functions to be applied on them and sends them over to the worker nodes. In
>> the context of reduce, it serializes the RDD partition and the reduce
>> function and sends them to the worker nodes. However, my understanding of
>> how things happen at the worker node is very limited and I would appreciate
>> it if someone could help me identify where the process of broadcasting the
>> results of local worker computations back to the master node takes place.
>> 
>> This is some of the limited knowledge that I have about the worker nodes:
>> 
>> Each job gets divided into smaller sets of tasks called stages. Each Stage
>> is either a Shuffle Map Stage or Result Stage. In a Shuffle Map Stage, the
>> task results are used as input for another stage. The result stage uses the
>> RDD to compute the action that initiated the job. So, this result stage
>> executes the last task for the job on the worker node. I would assume after
>> this is done, it gets the result and broadcasts it to the driver
>> application(the master).
>> 
>> In ResultTask.scala(spark-core src/main/scala org.apache.spark.scheduler) it
>> states "A task that sends back the output to the driver application.".
>> However, I don't see when or where this happens in the source code. I would
>> very much appreciate it if someone could help me identify where this happens
>> in the Spark source code.
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html
>>  
>> <http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html>
>> Sent from the Apache Spark User List mailing list archive at Nabble.com 
>> <http://nabble.com/>.
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> <mailto:user-h...@spark.apache.org>
>> 
>> 
> 

Reply via email to