Sorry - I'll furnish some details below. However, union is not an option for the business logic I have. The function will generate a specific file based on a variable passed in as the setter for the function. This variable changes with each RDD. I annotated the log line where the first run succeeds.
Logs output to give you some context: ... 14/10/16 15:32:25 INFO SparkContext: Starting job: count at GenerateJSONContent.java:145 14/10/16 15:32:25 INFO DAGScheduler: Got job 0 (count at GenerateJSONContent.java:145) with 1 output partitions (allowLocal=false) 14/10/16 15:32:25 INFO DAGScheduler: Final stage: Stage 0(count at GenerateJSONContent.java:145) 14/10/16 15:32:25 INFO DAGScheduler: Parents of final stage: List() 14/10/16 15:32:25 INFO DAGScheduler: Missing parents: List() 14/10/16 15:32:25 INFO DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[2] at parallelize at GenerateJSONContent.java:82), which has no missing parents 14/10/16 15:32:25 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[2] at parallelize at GenerateJSONContent.java:82) 14/10/16 15:32:25 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/10/16 15:32:25 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL) 14/10/16 15:32:25 INFO TaskSetManager: Serialized task 0.0:0 as 2048 bytes in 3 ms 14/10/16 15:32:25 INFO Executor: Running task ID 0 14/10/16 15:32:25 INFO Executor: Fetching http://172.16.1.204:42232/jars/rickshaw-spark-0.0.1-SNAPSHOT.jar with timestamp 1413473544838 14/10/16 15:32:25 INFO Utils: Fetching http://172.16.1.204:42232/jars/rickshaw-spark-0.0.1-SNAPSHOT.jar to /tmp/fetchFileTemp7432615579770034188.tmp 14/10/16 15:32:26 INFO Executor: Adding file:/tmp/spark-a471145f-ab44-447b-b48c-bb499024d756/rickshaw-spark-0.0.1-SNAPSHOT.jar to class loader 14/10/16 15:32:26 INFO Executor: Serialized size of result for 0 is 597 14/10/16 15:32:26 INFO Executor: Sending result for 0 directly to driver 14/10/16 15:32:26 INFO Executor: Finished task ID 0 14/10/16 15:32:26 INFO DAGScheduler: Completed ResultTask(0, 0) 14/10/16 15:32:26 INFO DAGScheduler: Stage 0 (count at GenerateJSONContent.java:145) finished in 0.791 s 14/10/16 15:32:26 INFO SparkContext: Job finished: count at GenerateJSONContent.java:145, took 0.889171151 s 14/10/16 15:32:26 INFO GenerateJSONContent: Running for tag :PublicationProductId 14/10/16 15:32:26 INFO TaskSetManager: Finished TID 0 in 778 ms on localhost (progress: 1/1) 14/10/16 15:32:26 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/10/16 15:32:26 INFO SparkContext: Starting job: foreach at GenerateJSONContent.java:156 14/10/16 15:32:26 INFO DAGScheduler: Got job 1 (foreach at GenerateJSONContent.java:156) with 1 output partitions (allowLocal=false) 14/10/16 15:32:26 INFO DAGScheduler: Final stage: Stage 1(foreach at GenerateJSONContent.java:156) 14/10/16 15:32:26 INFO DAGScheduler: Parents of final stage: List() 14/10/16 15:32:26 INFO DAGScheduler: Missing parents: List() 14/10/16 15:32:26 INFO DAGScheduler: Submitting Stage 1 (ParallelCollectionRDD[2] at parallelize at GenerateJSONContent.java:82), which has no missing parents 14/10/16 15:32:26 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (ParallelCollectionRDD[2] at parallelize at GenerateJSONContent.java:82) 14/10/16 15:32:26 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 14/10/16 15:32:26 INFO TaskSetManager: Starting task 1.0:0 as TID 1 on executor localhost: localhost (PROCESS_LOCAL) 14/10/16 15:32:26 INFO TaskSetManager: Serialized task 1.0:0 as 2792 bytes in 1 ms 14/10/16 15:32:26 INFO Executor: Running task ID 1 14/10/16 15:32:35 INFO Executor: Serialized size of result for 1 is 559 14/10/16 15:32:35 INFO Executor: Sending result for 1 directly to driver 14/10/16 15:32:35 INFO Executor: Finished task ID 1 14/10/16 15:32:35 INFO DAGScheduler: Completed ResultTask(1, 0) 14/10/16 15:32:35 INFO DAGScheduler: Stage 1 (foreach at GenerateJSONContent.java:156) finished in 9.098 s 14/10/16 15:32:35 INFO SparkContext: Job finished: foreach at GenerateJSONContent.java:156, took 9.112736939 s 14/10/16 15:32:35 INFO TaskSetManager: Finished TID 1 in 9095 ms on localhost (progress: 1/1) 14/10/16 15:32:35 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/10/16 15:32:36 INFO SparkContext: Starting job: count at GenerateJSONContent.java:145 14/10/16 15:32:36 INFO DAGScheduler: Got job 2 (count at GenerateJSONContent.java:145) with 1 output partitions (allowLocal=false) 14/10/16 15:32:36 INFO DAGScheduler: Final stage: Stage 2(count at GenerateJSONContent.java:145) 14/10/16 15:32:36 INFO DAGScheduler: Parents of final stage: List() 14/10/16 15:32:36 INFO DAGScheduler: Missing parents: List() 14/10/16 15:32:36 INFO DAGScheduler: Submitting Stage 2 (ParallelCollectionRDD[1] at parallelize at GenerateJSONContent.java:82), which has no missing parents 14/10/16 15:32:36 INFO DAGScheduler: Submitting 1 missing tasks from Stage 2 (ParallelCollectionRDD[1] at parallelize at GenerateJSONContent.java:82) 14/10/16 15:32:36 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 14/10/16 15:32:36 INFO TaskSetManager: Starting task 2.0:0 as TID 2 on executor localhost: localhost (PROCESS_LOCAL) 14/10/16 15:32:36 INFO TaskSetManager: Serialized task 2.0:0 as 2048 bytes in 0 ms 14/10/16 15:32:36 INFO Executor: Running task ID 2 14/10/16 15:32:36 INFO Executor: Serialized size of result for 2 is 597 14/10/16 15:32:36 INFO Executor: Sending result for 2 directly to driver 14/10/16 15:32:36 INFO Executor: Finished task ID 2 14/10/16 15:32:36 INFO DAGScheduler: Completed ResultTask(2, 0) 14/10/16 15:32:36 INFO DAGScheduler: Stage 2 (count at GenerateJSONContent.java:145) finished in 0.008 s 14/10/16 15:32:36 INFO SparkContext: Job finished: count at GenerateJSONContent.java:145, took 0.025463494 s <<<<<< Loop 1 succeeds 14/10/16 15:32:36 INFO GenerateJSONContent: Running for tag :CrossRevisionId 14/10/16 15:32:36 INFO SparkContext: Starting job: foreach at GenerateJSONContent.java:156 14/10/16 15:32:36 INFO DAGScheduler: Got job 3 (foreach at GenerateJSONContent.java:156) with 1 output partitions (allowLocal=false) 14/10/16 15:32:36 INFO DAGScheduler: Final stage: Stage 3(foreach at GenerateJSONContent.java:156) 14/10/16 15:32:36 INFO DAGScheduler: Parents of final stage: List() 14/10/16 15:32:36 INFO DAGScheduler: Missing parents: List() 14/10/16 15:32:36 INFO DAGScheduler: Submitting Stage 3 (ParallelCollectionRDD[1] at parallelize at GenerateJSONContent.java:82), which has no missing parents 14/10/16 15:32:36 INFO TaskSetManager: Finished TID 2 in 15 ms on localhost (progress: 1/1) 14/10/16 15:32:36 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 14/10/16 15:32:37 INFO DAGScheduler: Failed to run foreach at GenerateJSONContent.java:156 Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) More code context follows: ... Map<String, JavaRDD<String>> idLists = new HashMap<>(); //Populate the map above based on string tags .... String filePathSuffix = null; GenerateJSONContentFunction jsonContent; switch (options.getType()) { case "opinions": jsonContent = (GenerateJSONContentFunction) context.getBean("AssetOpinionsJsonContent"); filePathSuffix += "/reviews/product/opinions/json/"; esConf.setQueryString( esUtil.getTagDetailsForBusinessQueryString(options.getBusinessId()) ); break; case "summary": jsonContent = (GenerateJSONContentFunction) context.getBean("AssetSummaryJsonContent"); filePathSuffix += "/reviews/product/summary/json/"; break; case "consumer": jsonContent = (GenerateJSONContentFunction) context.getBean("ConsumerProfileJsonContent"); filePathSuffix += "/consumers/profiles/json/"; break; default: throw new RuntimeException("Invalid type parameter passed. Current values allows ('opinions','summary','consumer')."); } jsonContent.setHdfsUtil(hdfsUtil); for (String tagName: idLists.keySet()) { if (idLists.get(tagName).count() == 0) { continue; } LOG.info("Running for tag :" + tagName); String runId = UUID.randomUUID().toString(); String filePath = "/static_output/" + runId + "/" + businessIdHash + "/" + locale + filePathSuffix; jsonContent.setTagName(tagName); jsonContent.setOutputPath(filePath); idLists.get(tagName).foreach( jsonContent ); ... } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-an-action-inside-a-loop-across-multiple-RDDs-java-io-NotSerializableException-tp16580p16597.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org