Sorry - I'll furnish some details below. However, union is not an option for
the business logic I have. The function will generate a specific file based
on a variable passed in as the setter for the function. This variable
changes with each RDD. I annotated the log line where the first run
succeeds. 

Logs output to give you some context:
...
14/10/16 15:32:25 INFO SparkContext: Starting job: count at
GenerateJSONContent.java:145
14/10/16 15:32:25 INFO DAGScheduler: Got job 0 (count at
GenerateJSONContent.java:145) with 1 output partitions (allowLocal=false)
14/10/16 15:32:25 INFO DAGScheduler: Final stage: Stage 0(count at
GenerateJSONContent.java:145)
14/10/16 15:32:25 INFO DAGScheduler: Parents of final stage: List()
14/10/16 15:32:25 INFO DAGScheduler: Missing parents: List()
14/10/16 15:32:25 INFO DAGScheduler: Submitting Stage 0
(ParallelCollectionRDD[2] at parallelize at GenerateJSONContent.java:82),
which has no missing parents
14/10/16 15:32:25 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0
(ParallelCollectionRDD[2] at parallelize at GenerateJSONContent.java:82)
14/10/16 15:32:25 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/10/16 15:32:25 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
executor localhost: localhost (PROCESS_LOCAL)
14/10/16 15:32:25 INFO TaskSetManager: Serialized task 0.0:0 as 2048 bytes
in 3 ms
14/10/16 15:32:25 INFO Executor: Running task ID 0
14/10/16 15:32:25 INFO Executor: Fetching
http://172.16.1.204:42232/jars/rickshaw-spark-0.0.1-SNAPSHOT.jar with
timestamp 1413473544838
14/10/16 15:32:25 INFO Utils: Fetching
http://172.16.1.204:42232/jars/rickshaw-spark-0.0.1-SNAPSHOT.jar to
/tmp/fetchFileTemp7432615579770034188.tmp
14/10/16 15:32:26 INFO Executor: Adding
file:/tmp/spark-a471145f-ab44-447b-b48c-bb499024d756/rickshaw-spark-0.0.1-SNAPSHOT.jar
to class loader
14/10/16 15:32:26 INFO Executor: Serialized size of result for 0 is 597
14/10/16 15:32:26 INFO Executor: Sending result for 0 directly to driver
14/10/16 15:32:26 INFO Executor: Finished task ID 0
14/10/16 15:32:26 INFO DAGScheduler: Completed ResultTask(0, 0)
14/10/16 15:32:26 INFO DAGScheduler: Stage 0 (count at
GenerateJSONContent.java:145) finished in 0.791 s
14/10/16 15:32:26 INFO SparkContext: Job finished: count at
GenerateJSONContent.java:145, took 0.889171151 s
14/10/16 15:32:26 INFO GenerateJSONContent: Running for tag
:PublicationProductId
14/10/16 15:32:26 INFO TaskSetManager: Finished TID 0 in 778 ms on localhost
(progress: 1/1)
14/10/16 15:32:26 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool
14/10/16 15:32:26 INFO SparkContext: Starting job: foreach at
GenerateJSONContent.java:156
14/10/16 15:32:26 INFO DAGScheduler: Got job 1 (foreach at
GenerateJSONContent.java:156) with 1 output partitions (allowLocal=false)
14/10/16 15:32:26 INFO DAGScheduler: Final stage: Stage 1(foreach at
GenerateJSONContent.java:156)
14/10/16 15:32:26 INFO DAGScheduler: Parents of final stage: List()
14/10/16 15:32:26 INFO DAGScheduler: Missing parents: List()
14/10/16 15:32:26 INFO DAGScheduler: Submitting Stage 1
(ParallelCollectionRDD[2] at parallelize at GenerateJSONContent.java:82),
which has no missing parents
14/10/16 15:32:26 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1
(ParallelCollectionRDD[2] at parallelize at GenerateJSONContent.java:82)
14/10/16 15:32:26 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
14/10/16 15:32:26 INFO TaskSetManager: Starting task 1.0:0 as TID 1 on
executor localhost: localhost (PROCESS_LOCAL)
14/10/16 15:32:26 INFO TaskSetManager: Serialized task 1.0:0 as 2792 bytes
in 1 ms
14/10/16 15:32:26 INFO Executor: Running task ID 1
14/10/16 15:32:35 INFO Executor: Serialized size of result for 1 is 559
14/10/16 15:32:35 INFO Executor: Sending result for 1 directly to driver
14/10/16 15:32:35 INFO Executor: Finished task ID 1
14/10/16 15:32:35 INFO DAGScheduler: Completed ResultTask(1, 0)
14/10/16 15:32:35 INFO DAGScheduler: Stage 1 (foreach at
GenerateJSONContent.java:156) finished in 9.098 s
14/10/16 15:32:35 INFO SparkContext: Job finished: foreach at
GenerateJSONContent.java:156, took 9.112736939 s
14/10/16 15:32:35 INFO TaskSetManager: Finished TID 1 in 9095 ms on
localhost (progress: 1/1)
14/10/16 15:32:35 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks
have all completed, from pool
14/10/16 15:32:36 INFO SparkContext: Starting job: count at
GenerateJSONContent.java:145
14/10/16 15:32:36 INFO DAGScheduler: Got job 2 (count at
GenerateJSONContent.java:145) with 1 output partitions (allowLocal=false)
14/10/16 15:32:36 INFO DAGScheduler: Final stage: Stage 2(count at
GenerateJSONContent.java:145)
14/10/16 15:32:36 INFO DAGScheduler: Parents of final stage: List()
14/10/16 15:32:36 INFO DAGScheduler: Missing parents: List()
14/10/16 15:32:36 INFO DAGScheduler: Submitting Stage 2
(ParallelCollectionRDD[1] at parallelize at GenerateJSONContent.java:82),
which has no missing parents
14/10/16 15:32:36 INFO DAGScheduler: Submitting 1 missing tasks from Stage 2
(ParallelCollectionRDD[1] at parallelize at GenerateJSONContent.java:82)
14/10/16 15:32:36 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
14/10/16 15:32:36 INFO TaskSetManager: Starting task 2.0:0 as TID 2 on
executor localhost: localhost (PROCESS_LOCAL)
14/10/16 15:32:36 INFO TaskSetManager: Serialized task 2.0:0 as 2048 bytes
in 0 ms
14/10/16 15:32:36 INFO Executor: Running task ID 2
14/10/16 15:32:36 INFO Executor: Serialized size of result for 2 is 597
14/10/16 15:32:36 INFO Executor: Sending result for 2 directly to driver
14/10/16 15:32:36 INFO Executor: Finished task ID 2
14/10/16 15:32:36 INFO DAGScheduler: Completed ResultTask(2, 0)
14/10/16 15:32:36 INFO DAGScheduler: Stage 2 (count at
GenerateJSONContent.java:145) finished in 0.008 s
14/10/16 15:32:36 INFO SparkContext: Job finished: count at
GenerateJSONContent.java:145, took 0.025463494 s <<<<<< Loop 1 succeeds
14/10/16 15:32:36 INFO GenerateJSONContent: Running for tag :CrossRevisionId
14/10/16 15:32:36 INFO SparkContext: Starting job: foreach at
GenerateJSONContent.java:156
14/10/16 15:32:36 INFO DAGScheduler: Got job 3 (foreach at
GenerateJSONContent.java:156) with 1 output partitions (allowLocal=false)
14/10/16 15:32:36 INFO DAGScheduler: Final stage: Stage 3(foreach at
GenerateJSONContent.java:156)
14/10/16 15:32:36 INFO DAGScheduler: Parents of final stage: List()
14/10/16 15:32:36 INFO DAGScheduler: Missing parents: List()
14/10/16 15:32:36 INFO DAGScheduler: Submitting Stage 3
(ParallelCollectionRDD[1] at parallelize at GenerateJSONContent.java:82),
which has no missing parents
14/10/16 15:32:36 INFO TaskSetManager: Finished TID 2 in 15 ms on localhost
(progress: 1/1)
14/10/16 15:32:36 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks
have all completed, from pool
14/10/16 15:32:37 INFO DAGScheduler: Failed to run foreach at
GenerateJSONContent.java:156
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task not serializable: java.io.NotSerializableException:
org.apache.hadoop.conf.Configuration
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
        at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


More code context follows: 
        ...
        Map<String, JavaRDD&lt;String>> idLists = new HashMap<>();
        //Populate the map above based on string tags
        ....
        String filePathSuffix = null;
        GenerateJSONContentFunction jsonContent;
        switch (options.getType()) {
            case "opinions":
                jsonContent = (GenerateJSONContentFunction)
context.getBean("AssetOpinionsJsonContent");
                filePathSuffix += "/reviews/product/opinions/json/";
                esConf.setQueryString(
                       
esUtil.getTagDetailsForBusinessQueryString(options.getBusinessId())
                );
                break;
            case "summary":
                jsonContent = (GenerateJSONContentFunction)
context.getBean("AssetSummaryJsonContent");
                filePathSuffix += "/reviews/product/summary/json/";
                break;
            case "consumer":
                jsonContent = (GenerateJSONContentFunction)
context.getBean("ConsumerProfileJsonContent");
                filePathSuffix += "/consumers/profiles/json/";
                break;
            default: throw new RuntimeException("Invalid type parameter
passed. Current values allows ('opinions','summary','consumer').");
        }
        
        jsonContent.setHdfsUtil(hdfsUtil);
        
        for (String tagName: idLists.keySet()) {
            if (idLists.get(tagName).count() == 0) {
                continue;
            }
            LOG.info("Running for tag :" + tagName);

            String runId = UUID.randomUUID().toString();
            String filePath = "/static_output/" + runId + "/" +
businessIdHash + "/" + locale + filePathSuffix;
            jsonContent.setTagName(tagName);
            jsonContent.setOutputPath(filePath);

            
            idLists.get(tagName).foreach(
                    jsonContent
            );
            ...
        }




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-an-action-inside-a-loop-across-multiple-RDDs-java-io-NotSerializableException-tp16580p16597.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to