Re: Invalid checkpoint url
@Das, No, i am getting in the cluster mode. I think i understood why i am getting this error, please correct me if i am wrong. Reason is: checkpointing writes rdd to disk, so this checkpointing happens on all workers. Whenever, spark has to read back the rdd , checkpoint directory should be reachable to all the workers and should be a common place where workers can write to and read from. This asks for commonly accessible file system like nfs or hdfs or s3. So, if i give ssc.checkpoint("some local directory"), since workers are not able to read the rdds from the other worker's checkpoint directory , i am facing the above mentioned error. With this understanding, i am creating a t2 medium, hdfs 2.7.1 node and pointing the check point directory to "hdfs://ip:port/path/to/directory" Please correct me if i am wrong. On Wed, Sep 23, 2015 at 4:53 AM, Tathagata Daswrote: > Are you getting this error in local mode? > > > On Tue, Sep 22, 2015 at 7:34 AM, srungarapu vamsi < > srungarapu1...@gmail.com> wrote: > >> Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i >> don't use reduceByKeyAndWindow. >> >> When i start using "reduceByKeyAndWindow" it complains me with the error >> "Exception in thread "main" org.apache.spark.SparkException: Invalid >> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$ >> 2-97be-e3862eb5c944/rdd-8" >> >> The stack trace is as below: >> >> Exception in thread "main" org.apache.spark.SparkException: Invalid >> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$ >> 2-97be-e3862eb5c944/rdd-8 >> at >> org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54) >> at >> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) >> at >> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) >> at scala.Option.getOrElse(Option.scala:120) >> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) >> at >> org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97) >> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1468) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1483) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1504) >> at >> com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33) >> at >> com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33) >> at >>
Re: Invalid checkpoint url
Are you getting this error in local mode? On Tue, Sep 22, 2015 at 7:34 AM, srungarapu vamsiwrote: > Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i > don't use reduceByKeyAndWindow. > > When i start using "reduceByKeyAndWindow" it complains me with the error > "Exception in thread "main" org.apache.spark.SparkException: Invalid > checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$ > 2-97be-e3862eb5c944/rdd-8" > > The stack trace is as below: > > Exception in thread "main" org.apache.spark.SparkException: Invalid > checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$ > 2-97be-e3862eb5c944/rdd-8 > at > org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at > org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97) > at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1468) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1483) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1504) > at > com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33) > at > com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) > at >
Re: Invalid checkpoint url
Bingo! That is the problem. The solution is now obvious I presume :) On Tue, Sep 22, 2015 at 9:41 PM, srungarapu vamsiwrote: > @Das, > No, i am getting in the cluster mode. > I think i understood why i am getting this error, please correct me if i > am wrong. > Reason is: > checkpointing writes rdd to disk, so this checkpointing happens on all > workers. Whenever, spark has to read back the rdd , checkpoint directory > should be reachable to all the workers and should be a common place where > workers can write to and read from. This asks for commonly accessible file > system like nfs or hdfs or s3. > So, if i give ssc.checkpoint("some local directory"), since workers are > not able to read the rdds from the other worker's checkpoint directory , i > am facing the above mentioned error. > With this understanding, i am creating a t2 medium, hdfs 2.7.1 node and > pointing the check point directory to "hdfs://ip:port/path/to/directory" > > Please correct me if i am wrong. > > On Wed, Sep 23, 2015 at 4:53 AM, Tathagata Das > wrote: > >> Are you getting this error in local mode? >> >> >> On Tue, Sep 22, 2015 at 7:34 AM, srungarapu vamsi < >> srungarapu1...@gmail.com> wrote: >> >>> Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i >>> don't use reduceByKeyAndWindow. >>> >>> When i start using "reduceByKeyAndWindow" it complains me with the error >>> "Exception in thread "main" org.apache.spark.SparkException: Invalid >>> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$ >>> 2-97be-e3862eb5c944/rdd-8" >>> >>> The stack trace is as below: >>> >>> Exception in thread "main" org.apache.spark.SparkException: Invalid >>> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$ >>> 2-97be-e3862eb5c944/rdd-8 >>> at >>> org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) >>> at scala.Option.getOrElse(Option.scala:120) >>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) >>> at >>> org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97) >>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >>> at scala.collection.immutable.List.foreach(List.scala:318) >>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >>> at scala.collection.immutable.List.foreach(List.scala:318) >>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1468) >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1483) >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1504) >>>
Re: Invalid checkpoint url
Have you tried simply ssc.checkpoint("checkpointā€¯)? This should create it in the local folder, has always worked for me when in development on local mode. For the others (/tmp/..) make sure you have rights to write there. -adrian From: srungarapu vamsi Date: Tuesday, September 22, 2015 at 7:59 AM To: user Subject: Invalid checkpoint url I am using reduceByKeyAndWindow (with inverse reduce function) in my code. In order to use this, it seems the checkpointDirectory which i have to use should be hadoop compatible file system. Does that mean that, i should setup hadoop on my system. I googled about this and i found in a S.O answer that i need not setup hdfs but the checkpoint directory should be HDFS copatible. I am a beginner in this area. I am running my spark streaming application on ubuntu 14.04, spark -1.3.1 If at all i need not setup hdfs and ext4 is hdfs compatible, then how does my checkpoint directory look like? i tried all these: ssc.checkpoint("/tmp/checkpoint") ssc.checkpoint("hdfs:///tmp/checkpoint") ssc.checkpoint("file:///tmp/checkpoint") But none of them worked for me. -- /Vamsi
Re: Invalid checkpoint url
Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i don't use reduceByKeyAndWindow. When i start using "reduceByKeyAndWindow" it complains me with the error "Exception in thread "main" org.apache.spark.SparkException: Invalid checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$ 2-97be-e3862eb5c944/rdd-8" The stack trace is as below: Exception in thread "main" org.apache.spark.SparkException: Invalid checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$ 2-97be-e3862eb5c944/rdd-8 at org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1468) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1483) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1504) at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33) at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at