Re: Accumulators : Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
It works fine on my *Spark 1.1.0* Thanks Best Regards On Mon, Oct 27, 2014 at 12:22 AM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: Hi Akhil, Please see this related message. http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-td17263.html I am curious if this works for you also. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accumulators-Task-not-serializable-java-io-NotSerializableException-org-apache-spark-SparkContext-tp17262p17287.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accumulators : Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
Just tried the below code and works for me, not sure why is sparkContext being sent inside the mapPartitions function in your case. Can you try with simple map() instead of mapPartition? val ac = sc.accumulator(0) val or = sc.parallelize(1 to 1) val ps = or.map(x = (x,x+2)).map(x = ac +=1) val test = ps.collect() println(ac.value) Thanks Best Regards On Sun, Oct 26, 2014 at 1:44 AM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: Hi all, I tried to use accumulators without any success so far. My code is simple: val sc = new SparkContext(conf) val accum = sc.accumulator(0) val partialStats = sc.textFile(f.getAbsolutePath()) .map(line = { val key = line.split(\t).head; (key , line)} ) .groupByKey(128) .mapPartitions{iter = { accum += 1; foo(iter)}} .reduce(_ + _) println(accum.value) Now, if I remove the 'accum += 1', everything works fine. If I keep it, I get this weird error: Exception in thread main 14/10/25 21:58:56 INFO TaskSchedulerImpl: Cancelling stage 0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:890) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:887) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:887) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:887) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:886) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:886) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1204) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Can someone please help! Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accumulators-Task-not-serializable-java-io-NotSerializableException-org-apache-spark-SparkContext-tp17262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accumulators : Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
Hi Akhil, Please see this related message. http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-td17263.html I am curious if this works for you also. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accumulators-Task-not-serializable-java-io-NotSerializableException-org-apache-spark-SparkContext-tp17262p17287.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Accumulators : Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
Hi all, I tried to use accumulators without any success so far. My code is simple: val sc = new SparkContext(conf) val accum = sc.accumulator(0) val partialStats = sc.textFile(f.getAbsolutePath()) .map(line = { val key = line.split(\t).head; (key , line)} ) .groupByKey(128) .mapPartitions{iter = { accum += 1; foo(iter)}} .reduce(_ + _) println(accum.value) Now, if I remove the 'accum += 1', everything works fine. If I keep it, I get this weird error: Exception in thread main 14/10/25 21:58:56 INFO TaskSchedulerImpl: Cancelling stage 0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:890) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:887) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:887) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:887) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:886) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:886) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1204) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Can someone please help! Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accumulators-Task-not-serializable-java-io-NotSerializableException-org-apache-spark-SparkContext-tp17262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
Which code do you used, do you caused by your own code or something in spark itself? On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com hsy...@gmail.com wrote: I have the same problem On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote: Hi, Everyone. I have a piece of following code. When I run it, it occurred the error just like below, it seem that the SparkContext is not serializable, but i do not try to use the SparkContext except the broadcast. [In fact, this code is in the MLLib, I just try to broadcast the centerArrays ] it can success in the redeceBykey operation, but failed at the collect operation, this confused me. INFO DAGScheduler: Failed to run collect at KMeans.scala:235 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = { @transient val sc = data.sparkContext // I try to add the transient annotation here, but it doesn't work // Initialize each run's center to a random point val seed = new XORShiftRandom().nextInt() val sample = data.takeSample(true, runs, seed).toSeq val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r))) // On each step, sample 2 * k points on average for each run with probability proportional // to their squared distance from that run's current centers for (step - 0 until initializationSteps) { val centerArrays = sc.broadcast(centers.map(_.toArray)) val sumCosts = data.flatMap { point = for (r - 0 until runs) yield (r, KMeans.pointCost(centerArrays.value(r), point)) }.reduceByKey(_ + _).collectAsMap() //can pass at this point val chosen = data.mapPartitionsWithIndex { (index, points) = val rand = new XORShiftRandom(seed ^ (step 16) ^ index) for { p - points r - 0 until runs if rand.nextDouble() KMeans.pointCost(centerArrays.value(r), p) * 2 * k / sumCosts(r) } yield (r, p) }.collect() // failed at this point. for ((r, p) - chosen) { centers(r) += p } } -- *Best Wishes!* *Li Hu(李浒) | Graduate Student* *Institute for Interdisciplinary Information Sciences(IIIS http://iiis.tsinghua.edu.cn/) * *Tsinghua University, China* *Email: lihu...@gmail.com lihu...@gmail.com* *Tel : +86 15120081920* *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ http://iiis.tsinghua.edu.cn/zh/lihu/*
Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
You can set the Java option -Dsun.io.serialization.extendedDebugInfo=true to have more information about the object be printed. It will help you trace down the how the SparkContext is getting included in some kind of closure. TD On Thu, Jul 24, 2014 at 9:48 AM, lihu lihu...@gmail.com wrote: Which code do you used, do you caused by your own code or something in spark itself? On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com hsy...@gmail.com wrote: I have the same problem On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote: Hi, Everyone. I have a piece of following code. When I run it, it occurred the error just like below, it seem that the SparkContext is not serializable, but i do not try to use the SparkContext except the broadcast. [In fact, this code is in the MLLib, I just try to broadcast the centerArrays ] it can success in the redeceBykey operation, but failed at the collect operation, this confused me. INFO DAGScheduler: Failed to run collect at KMeans.scala:235 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = { @transient val sc = data.sparkContext // I try to add the transient annotation here, but it doesn't work // Initialize each run's center to a random point val seed = new XORShiftRandom().nextInt() val sample = data.takeSample(true, runs, seed).toSeq val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r))) // On each step, sample 2 * k points on average for each run with probability proportional // to their squared distance from that run's current centers for (step - 0 until initializationSteps) { val centerArrays = sc.broadcast(centers.map(_.toArray)) val sumCosts = data.flatMap { point = for (r - 0 until runs) yield (r, KMeans.pointCost(centerArrays.value(r), point)) }.reduceByKey(_ + _).collectAsMap() //can pass at this point val chosen = data.mapPartitionsWithIndex { (index, points) = val rand = new XORShiftRandom(seed ^ (step 16) ^ index) for { p - points r - 0 until runs if rand.nextDouble() KMeans.pointCost(centerArrays.value(r), p) * 2 * k / sumCosts(r) } yield (r, p) }.collect() // failed at this point. for ((r, p) - chosen) { centers(r) += p } } -- *Best Wishes!* *Li Hu(李浒) | Graduate Student* *Institute for Interdisciplinary Information Sciences(IIIS http://iiis.tsinghua.edu.cn/) * *Tsinghua University, China* *Email: lihu...@gmail.com lihu...@gmail.com* *Tel : +86 15120081920 %2B86%2015120081920* *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ http://iiis.tsinghua.edu.cn/zh/lihu/*
Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
I have the same problem On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote: Hi, Everyone. I have a piece of following code. When I run it, it occurred the error just like below, it seem that the SparkContext is not serializable, but i do not try to use the SparkContext except the broadcast. [In fact, this code is in the MLLib, I just try to broadcast the centerArrays ] it can success in the redeceBykey operation, but failed at the collect operation, this confused me. INFO DAGScheduler: Failed to run collect at KMeans.scala:235 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = { @transient val sc = data.sparkContext // I try to add the transient annotation here, but it doesn't work // Initialize each run's center to a random point val seed = new XORShiftRandom().nextInt() val sample = data.takeSample(true, runs, seed).toSeq val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r))) // On each step, sample 2 * k points on average for each run with probability proportional // to their squared distance from that run's current centers for (step - 0 until initializationSteps) { val centerArrays = sc.broadcast(centers.map(_.toArray)) val sumCosts = data.flatMap { point = for (r - 0 until runs) yield (r, KMeans.pointCost(centerArrays.value(r), point)) }.reduceByKey(_ + _).collectAsMap() //can pass at this point val chosen = data.mapPartitionsWithIndex { (index, points) = val rand = new XORShiftRandom(seed ^ (step 16) ^ index) for { p - points r - 0 until runs if rand.nextDouble() KMeans.pointCost(centerArrays.value(r), p) * 2 * k / sumCosts(r) } yield (r, p) }.collect() // failed at this point. for ((r, p) - chosen) { centers(r) += p } }
Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
Hi, Everyone. I have a piece of following code. When I run it, it occurred the error just like below, it seem that the SparkContext is not serializable, but i do not try to use the SparkContext except the broadcast. [In fact, this code is in the MLLib, I just try to broadcast the centerArrays ] it can success in the redeceBykey operation, but failed at the collect operation, this confused me. INFO DAGScheduler: Failed to run collect at KMeans.scala:235 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = { @transient val sc = data.sparkContext // I try to add the transient annotation here, but it doesn't work // Initialize each run's center to a random point val seed = new XORShiftRandom().nextInt() val sample = data.takeSample(true, runs, seed).toSeq val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r))) // On each step, sample 2 * k points on average for each run with probability proportional // to their squared distance from that run's current centers for (step - 0 until initializationSteps) { val centerArrays = sc.broadcast(centers.map(_.toArray)) val sumCosts = data.flatMap { point = for (r - 0 until runs) yield (r, KMeans.pointCost(centerArrays.value(r), point)) }.reduceByKey(_ + _).collectAsMap() //can pass at this point val chosen = data.mapPartitionsWithIndex { (index, points) = val rand = new XORShiftRandom(seed ^ (step 16) ^ index) for { p - points r - 0 until runs if rand.nextDouble() KMeans.pointCost(centerArrays.value(r), p) * 2 * k / sumCosts(r) } yield (r, p) }.collect() // failed at this point. for ((r, p) - chosen) { centers(r) += p } }