My apologies, I had pasted the wrong exception trace in the previous email. Here is the actual exception that I am receiving.
Exception in thread "main" java.lang.NullPointerException at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154) at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > On Apr 17, 2015, at 2:30 AM, Raghav Shankar <raghav0110...@gmail.com> wrote: > > Hey Imran, > > Thanks for the great explanation! This cleared up a lot of things for me. I > am actually trying to utilize some of the features within Spark for a system > I am developing. I am currently working on developing a subsystem that can be > integrated within Spark and other Big Data solutions. In order to integrate > it within Spark, I am trying to utilize the rdds and functions provided to > the reduce method on my system. My system is developed in Scala and Java. In > Spark, I have seen that the function provided to the reduce method, along > with the RDD, gets serialized and sent to the worker nodes. The worker nodes > are able to deserialize them and then execute the task on them. I see this > happening in ResultTask.scala. When I try to do something similar, I get > exceptions. The system I am developing has Spark jars in its build path, so > it is able to create a SparkContext etc. > > When I do, > > val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() > (similar to DAGScheduler.scala) > val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, > Iterator[Int]) => Int)]( > ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader) > println(func2(context, rdd2.iterator(rdd2.partitions(1), context))); > > I get the proper result and can print it out. > > But when I involve the network by serializing the data, using the network to > send it to a different program, then deserialize the data and use the > function, I get the following error: > > Exception in thread "main" java.lang.NullPointerException > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) > at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31) > at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30) > at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) > at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) > at SimpleApp$.net(SimpleApp.scala:71) > at SimpleApp$.main(SimpleApp.scala:76) > at SimpleApp.main(SimpleApp.scala) > > I have also made sure that I am adding the class file of the program that is > sending the serialized data to the bin folder of the program that is > receiving the data. I’m not sure what I am doing wrong. I’ve done the > serialization and creation of the function similar to how Spark does it. I > created another reduce function like this. When implemented this way, it > prints out the result of func2 properly. But when I involve the network by > sending the serialized data to another program, I get the above exception. > > def reduceMod(f: (Integer, Integer) => Integer): Integer = { > val reducePartition: Iterator[Integer] => Option[Integer] = iter => { > if (iter.hasNext) { > Some(iter.reduceLeft(f)) > } else { > None > } > } > val processFunc = (context: TaskContext, iter: Iterator[Integer]) => > reducePartition(iter) > val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) => Int] > context = new TaskContextImpl(stageId = 1, partitionId = 1, > taskAttemptId = 1, attemptNumber = 1, runningLocally = false) > println(func.getClass.getName); > println(func(context, rdd.iterator(rdd.partitions(1), context))); > val bb = closureSerializer.serialize((rdd, func) : AnyRef).array() > val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], > (TaskContext, Iterator[Int]) => Int)]( > ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader) > println(func2(context, rdd3.iterator(rdd3.partitions(1), context))); > 1 > } > > I was wondering if you had any ideas on what I am doing wrong, or how I can > properly send the serialized version of the RDD and function to my other > program. My thought is that I might need to add more jars to the build path, > but I have no clue if thats the issue and what jars I need to add. > > Thanks, > Raghav > >> On Apr 13, 2015, at 10:22 PM, Imran Rashid <iras...@cloudera.com >> <mailto:iras...@cloudera.com>> wrote: >> >> On the worker side, it all happens in Executor. The task result is computed >> here: >> >> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210 >> >> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210> >> >> then its serialized along with some other goodies, and finally sent back to >> the driver here: >> >> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255 >> >> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255> >> >> What happens on the driver is quite a bit more complicated, and involves a >> number of spots in the code, but at least to get you started, the results >> are received here: >> >> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328 >> >> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328> >> >> though perhaps a more interesting spot is where they are handled in >> DagScheduler#handleTaskCompletion: >> https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001 >> >> <https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001> >> >> >> also, I think I know what you mean, but just to make sure: I wouldn't say >> the results from the worker are "broadcast" back to the driver. (a) in >> spark, "broadcast" tends to refer to a particular api for sharing immutable >> data from the driver to the workers (only one direction) and (b) it doesn't >> really fit a more general meaning of "broadcast" either, since the results >> are sent only to the driver, not to all nodes. >> >> On Sun, Mar 29, 2015 at 8:34 PM, raggy <raghav0110...@gmail.com >> <mailto:raghav0110...@gmail.com>> wrote: >> I am a PhD student working on a research project related to Apache Spark. I >> am trying to modify some of the spark source code such that instead of >> sending the final result RDD from the worker nodes to a master node, I want >> to send the final result RDDs to some different node. In order to do this, I >> have been trying to identify at which point the Spark worker nodes broadcast >> the results of a job back to the master. >> >> So far, I understand that in Spark, the master serializes the RDD and the >> functions to be applied on them and sends them over to the worker nodes. In >> the context of reduce, it serializes the RDD partition and the reduce >> function and sends them to the worker nodes. However, my understanding of >> how things happen at the worker node is very limited and I would appreciate >> it if someone could help me identify where the process of broadcasting the >> results of local worker computations back to the master node takes place. >> >> This is some of the limited knowledge that I have about the worker nodes: >> >> Each job gets divided into smaller sets of tasks called stages. Each Stage >> is either a Shuffle Map Stage or Result Stage. In a Shuffle Map Stage, the >> task results are used as input for another stage. The result stage uses the >> RDD to compute the action that initiated the job. So, this result stage >> executes the last task for the job on the worker node. I would assume after >> this is done, it gets the result and broadcasts it to the driver >> application(the master). >> >> In ResultTask.scala(spark-core src/main/scala org.apache.spark.scheduler) it >> states "A task that sends back the output to the driver application.". >> However, I don't see when or where this happens in the source code. I would >> very much appreciate it if someone could help me identify where this happens >> in the Spark source code. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html >> >> <http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html> >> Sent from the Apache Spark User List mailing list archive at Nabble.com >> <http://nabble.com/>. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> For additional commands, e-mail: user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org> >> >> >