Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i don't use reduceByKeyAndWindow.
When i start using "reduceByKeyAndWindow" it complains me with the error "Exception in thread "main" org.apache.spark.SparkException: Invalid checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$ 2-97be-e3862eb5c944/rdd-8" The stack trace is as below: Exception in thread "main" org.apache.spark.SparkException: Invalid checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$ 2-97be-e3862eb5c944/rdd-8 at org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1468) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1483) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1504) at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33) at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) On Tue, Sep 22, 2015 at 6:49 PM, Adrian Tanase <atan...@adobe.com> wrote: > Have you tried simply ssc.checkpoint("checkpointā€¯)? This should create it > in the local folder, has always worked for me when in development on local > mode. > > For the others (/tmp/..) make sure you have rights to write there. > > -adrian > > From: srungarapu vamsi > Date: Tuesday, September 22, 2015 at 7:59 AM > To: user > Subject: Invalid checkpoint url > > I am using reduceByKeyAndWindow (with inverse reduce function) in my code. > In order to use this, it seems the checkpointDirectory which i have to use > should be hadoop compatible file system. > Does that mean that, i should setup hadoop on my system. > I googled about this and i found in a S.O answer that i need not setup > hdfs but the checkpoint directory should be HDFS copatible. > > I am a beginner in this area. I am running my spark streaming application > on ubuntu 14.04, spark -1.3.1 > If at all i need not setup hdfs and ext4 is hdfs compatible, then how does > my checkpoint directory look like? > > i tried all these: > ssc.checkpoint("/tmp/checkpoint") > ssc.checkpoint("hdfs:///tmp/checkpoint") > ssc.checkpoint("file:///tmp/checkpoint") > > But none of them worked for me. > > -- > /Vamsi > -- /Vamsi