[ 
https://issues.apache.org/jira/browse/SPARK-11422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abbass Marouni updated SPARK-11422:
-----------------------------------
    Summary: Spark Streaming job which does a transformation involving a 
DStream and a RDD cannot be restarted from a checkpoint  (was: Restarting a 
Spark Streaming Job from a checkpoint fails)

> Spark Streaming job which does a transformation involving a DStream and a RDD 
> cannot be restarted from a checkpoint
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-11422
>                 URL: https://issues.apache.org/jira/browse/SPARK-11422
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.3.1, 1.4.1, 1.5.1
>            Reporter: Abbass Marouni
>
> A Spark Streaming job (with checkpointing enabled) that does a simple 
> transform operation on a JavaDStream by unioning it with a JavaRDD can be 
> executed correctly the first time : 
> {code}
> import java.util.Arrays;
> import java.util.List;
> import java.util.PriorityQueue;
> import java.util.Queue;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
> public class Driver2 {
>     public static final String CHKPTDIR = "/tmp/spark/checkPointDir";
>     public static void main(String[] args) {
>         JavaStreamingContextFactory factory = new 
> JavaStreamingContextFactory() {
>             @Override
>             public JavaStreamingContext create() {
>                 try {
>                     return createContext();
>                 } catch (Exception e) {
>                     // TODO Auto-generated catch block
>                     e.printStackTrace();
>                 }
>                 return null;
>             }
>         };
>         JavaStreamingContext jssc = 
> JavaStreamingContext.getOrCreate(CHKPTDIR, factory);
>         // Start our streaming context and wait for it to "finish"
>         jssc.start();
>         // Wait for the job to finish
>         jssc.awaitTermination();
>     }
>     public static class f2 implements Function<JavaRDD<String>, 
> JavaRDD<String>> {
>         /**
>          * 
>          */
>         private static final long serialVersionUID = 1L;
>         private JavaRDD<String>   ref_rdd;
>         public f2(JavaRDD<String> ref_rdd) {
>             this.ref_rdd = ref_rdd;
>         }
>         @Override
>         public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {
>             return v1.union(ref_rdd);
>         }
>     }
>     private static JavaStreamingContext createContext() throws 
> java.lang.Exception {
>         final SparkConf conf = new SparkConf();
>         conf.setMaster("local[*]");
>         conf.setAppName("TEST APP");
>         JavaStreamingContext jssc = new JavaStreamingContext(conf, 
> Durations.seconds(5));
>         jssc.checkpoint(CHKPTDIR); // set checkpoint directory
>         List<String> list01 = Arrays.asList("A", "B", "C", "D", "E");
>         JavaRDD<String> staticRDD = jssc.sparkContext().parallelize(list01);
>         List<String> list02 = Arrays.asList("F", "G", "H", "I", "J");
>         JavaRDD<String> streamRDD = jssc.sparkContext().parallelize(list02);
>         Queue<JavaRDD<String>> queue = new PriorityQueue<JavaRDD<String>>();
>         queue.add(streamRDD);
>         // replay the same RDD over and over
>         JavaDStream<String> inputStream = jssc.queueStream(queue, false, 
> streamRDD);
>         // Union the inputStream with the staticRDD
>         JavaDStream<String> unionStream = inputStream.transform(new 
> f2(staticRDD));
>         unionStream.print();
>         return jssc;
>     }
> }
> {code}
> The job fails with the following exception when stopped and restarted again :
> {code}
> org.apache.spark.SparkException: RDD transformations and actions can only be 
> invoked by the driver, not inside of other transformations; for example, 
> rdd1.map(x => rdd2.values.count() * x) is invalid because the values 
> transformation and count action cannot be performed inside of the rdd1.map 
> transformation. For more information, see SPARK-5063.
>       at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>       at org.apache.spark.rdd.RDD.union(RDD.scala:502)
>       at org.apache.spark.api.java.JavaRDD.union(JavaRDD.scala:151)
>       at fr.marouni.Driver2$f2.call(Driver2.java:61)
>       at fr.marouni.Driver2$f2.call(Driver2.java:1)
>       at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$1(JavaDStreamLike.scala:334)
>       at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
>       at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1.apply(JavaDStreamLike.scala:335)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>       at scala.Option.orElse(Option.scala:257)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>       at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>       at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
>       at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
>       at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>       at 
> org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
>       at fr.marouni.Driver2.main(Driver2.java:39)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to