[ https://issues.apache.org/jira/browse/SPARK-11422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Abbass Marouni updated SPARK-11422: ----------------------------------- Description: A Spark Streaming job (with checkpointing enabled) that does a simple transform operation on a JavaDStream by unioning it with a JavaRDD can be executed correctly the first time : {code} import java.util.Arrays; import java.util.List; import java.util.PriorityQueue; import java.util.Queue; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; public class Driver2 { public static final String CHKPTDIR = "/tmp/spark/checkPointDir"; public static void main(String[] args) { JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { try { return createContext(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } }; JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHKPTDIR, factory); // Start our streaming context and wait for it to "finish" jssc.start(); // Wait for the job to finish jssc.awaitTermination(); } public static class f2 implements Function<JavaRDD<String>, JavaRDD<String>> { /** * */ private static final long serialVersionUID = 1L; private JavaRDD<String> ref_rdd; public f2(JavaRDD<String> ref_rdd) { this.ref_rdd = ref_rdd; } @Override public JavaRDD<String> call(JavaRDD<String> v1) throws Exception { return v1.union(ref_rdd); } } private static JavaStreamingContext createContext() throws java.lang.Exception { final SparkConf conf = new SparkConf(); conf.setMaster("local[*]"); conf.setAppName("TEST APP"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); jssc.checkpoint(CHKPTDIR); // set checkpoint directory List<String> list01 = Arrays.asList("A", "B", "C", "D", "E"); JavaRDD<String> staticRDD = jssc.sparkContext().parallelize(list01); List<String> list02 = Arrays.asList("F", "G", "H", "I", "J"); JavaRDD<String> streamRDD = jssc.sparkContext().parallelize(list02); Queue<JavaRDD<String>> queue = new PriorityQueue<JavaRDD<String>>(); queue.add(streamRDD); // replay the same RDD over and over JavaDStream<String> inputStream = jssc.queueStream(queue, false, streamRDD); // Union the inputStream with the staticRDD JavaDStream<String> unionStream = inputStream.transform(new f2(staticRDD)); unionStream.print(); return jssc; } } {code} The job fails with the following exception when stopped and restarted again : {code} org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.union(RDD.scala:502) at org.apache.spark.api.java.JavaRDD.union(JavaRDD.scala:151) at fr.marouni.Driver2$f2.call(Driver2.java:61) at fr.marouni.Driver2$f2.call(Driver2.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$1(JavaDStreamLike.scala:334) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666) at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610) at fr.marouni.Driver2.main(Driver2.java:39) {code} was: A Spark Streaming job (with checkpointing enabled) that does a simple transform operation on a JavaDStream by unioning it with a JavaRDD can be executed correctly the first time : {code} import java.util.Arrays; import java.util.List; import java.util.PriorityQueue; import java.util.Queue; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; public class Driver2 { public static final String CHKPTDIR = "/tmp/spark/checkPointDir"; public static void main(String[] args) { JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { try { return createContext(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } }; JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHKPTDIR, factory); // Start our streaming context and wait for it to "finish" jssc.start(); // Wait for the job to finish jssc.awaitTermination(); } public static class f2 implements Function<JavaRDD<String>, JavaRDD<String>> { /** * */ private static final long serialVersionUID = 1L; private JavaRDD<String> ref_rdd; public f2(JavaRDD<String> ref_rdd) { this.ref_rdd = ref_rdd; } @Override public JavaRDD<String> call(JavaRDD<String> v1) throws Exception { return v1.union(ref_rdd); } } private static JavaStreamingContext createContext() throws java.lang.Exception { final SparkConf conf = new SparkConf(); conf.setMaster("local[*]"); conf.setAppName("TEST APP"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); jssc.checkpoint(CHKPTDIR); // set checkpoint directory List<String> list01 = Arrays.asList("A", "B", "C", "D", "E"); JavaRDD<String> staticRDD = jssc.sparkContext().parallelize(list01); List<String> list02 = Arrays.asList("F", "G", "H", "I", "J"); JavaRDD<String> streamRDD = jssc.sparkContext().parallelize(list02); Queue<JavaRDD<String>> queue = new PriorityQueue<JavaRDD<String>>(); queue.add(streamRDD); // replay the same RDD over and over JavaDStream<String> inputStream = jssc.queueStream(queue, false, streamRDD); // Union the inputStream with the staticRDD JavaDStream<String> unionStream = inputStream.transform(new f2(staticRDD)); unionStream.print(); return jssc; } } {code} The job fails with the following exception when stopped and restarted again : {code} org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.union(RDD.scala:502) at org.apache.spark.api.java.JavaRDD.union(JavaRDD.scala:151) at fr.marouni.Driver2$f2.call(Driver2.java:61) at fr.marouni.Driver2$f2.call(Driver2.java:1) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$1(JavaDStreamLike.scala:334) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666) at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610) at fr.marouni.Driver2.main(Driver2.java:39) {code} Please check the attached Java code of the job. > Restarting a Spark Streaming Job from a checkpoint fails > -------------------------------------------------------- > > Key: SPARK-11422 > URL: https://issues.apache.org/jira/browse/SPARK-11422 > Project: Spark > Issue Type: Bug > Affects Versions: 1.3.1, 1.4.1, 1.5.1 > Reporter: Abbass Marouni > > A Spark Streaming job (with checkpointing enabled) that does a simple > transform operation on a JavaDStream by unioning it with a JavaRDD can be > executed correctly the first time : > {code} > import java.util.Arrays; > import java.util.List; > import java.util.PriorityQueue; > import java.util.Queue; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.function.Function; > import org.apache.spark.streaming.Durations; > import org.apache.spark.streaming.api.java.JavaDStream; > import org.apache.spark.streaming.api.java.JavaStreamingContext; > import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; > public class Driver2 { > public static final String CHKPTDIR = "/tmp/spark/checkPointDir"; > public static void main(String[] args) { > JavaStreamingContextFactory factory = new > JavaStreamingContextFactory() { > @Override > public JavaStreamingContext create() { > try { > return createContext(); > } catch (Exception e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > return null; > } > }; > JavaStreamingContext jssc = > JavaStreamingContext.getOrCreate(CHKPTDIR, factory); > // Start our streaming context and wait for it to "finish" > jssc.start(); > // Wait for the job to finish > jssc.awaitTermination(); > } > public static class f2 implements Function<JavaRDD<String>, > JavaRDD<String>> { > /** > * > */ > private static final long serialVersionUID = 1L; > private JavaRDD<String> ref_rdd; > public f2(JavaRDD<String> ref_rdd) { > this.ref_rdd = ref_rdd; > } > @Override > public JavaRDD<String> call(JavaRDD<String> v1) throws Exception { > return v1.union(ref_rdd); > } > } > private static JavaStreamingContext createContext() throws > java.lang.Exception { > final SparkConf conf = new SparkConf(); > conf.setMaster("local[*]"); > conf.setAppName("TEST APP"); > JavaStreamingContext jssc = new JavaStreamingContext(conf, > Durations.seconds(5)); > jssc.checkpoint(CHKPTDIR); // set checkpoint directory > List<String> list01 = Arrays.asList("A", "B", "C", "D", "E"); > JavaRDD<String> staticRDD = jssc.sparkContext().parallelize(list01); > List<String> list02 = Arrays.asList("F", "G", "H", "I", "J"); > JavaRDD<String> streamRDD = jssc.sparkContext().parallelize(list02); > Queue<JavaRDD<String>> queue = new PriorityQueue<JavaRDD<String>>(); > queue.add(streamRDD); > // replay the same RDD over and over > JavaDStream<String> inputStream = jssc.queueStream(queue, false, > streamRDD); > // Union the inputStream with the staticRDD > JavaDStream<String> unionStream = inputStream.transform(new > f2(staticRDD)); > unionStream.print(); > return jssc; > } > } > {code} > The job fails with the following exception when stopped and restarted again : > {code} > org.apache.spark.SparkException: RDD transformations and actions can only be > invoked by the driver, not inside of other transformations; for example, > rdd1.map(x => rdd2.values.count() * x) is invalid because the values > transformation and count action cannot be performed inside of the rdd1.map > transformation. For more information, see SPARK-5063. > at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) > at org.apache.spark.rdd.RDD.union(RDD.scala:502) > at org.apache.spark.api.java.JavaRDD.union(JavaRDD.scala:151) > at fr.marouni.Driver2$f2.call(Driver2.java:61) > at fr.marouni.Driver2$f2.call(Driver2.java:1) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$1(JavaDStreamLike.scala:334) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666) > at > org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at scala.Option.orElse(Option.scala:257) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at > org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222) > at > org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92) > at > org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73) > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) > at > org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610) > at fr.marouni.Driver2.main(Driver2.java:39) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org