1.: I will paste the full content of the environment page of the example application running against the cluster at the end of this message. 2. and 3.: Following #2 I was able to see that the count was incorrectly 0 when running against the cluster, and following #3 I was able to get the message: org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[4] at count at <console>:15(0) has different number of partitions than original RDD MappedRDD[3] at textFile at <console>:12(2)
I think I understand - state checkpoints and other file-exchange operations in Spark cluster require a distributed/shared filesystem, even with just a single-host cluster and the driver/shell on a second host. Is that correct? Thank you, Paul Stages Storage Environment Executors NetworkWordCumulativeCountUpdateStateByKey application UI Environment Runtime Information Name Value Java Home /usr/lib/jvm/jdk1.8.0/jre Java Version 1.8.0 (Oracle Corporation) Scala Home Scala Version version 2.10.3 Spark Properties Name Value spark.app.name NetworkWordCumulativeCountUpdateStateByKey spark.cleaner.ttl 3600 spark.deploy.recoveryMode ZOOKEEPER spark.deploy.zookeeper.url pubsub01:2181 spark.driver.host 10.10.41.67 spark.driver.port 37360 spark.fileserver.uri http://10.10.41.67:40368 spark.home /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2 spark.httpBroadcast.uri http://10.10.41.67:45440 spark.jars /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar spark.master spark://10.10.41.19:7077 System Properties Name Value awt.toolkit sun.awt.X11.XToolkit file.encoding ANSI_X3.4-1968 file.encoding.pkg sun.io file.separator / java.awt.graphicsenv sun.awt.X11GraphicsEnvironment java.awt.printerjob sun.print.PSPrinterJob java.class.version 52.0 java.endorsed.dirs /usr/lib/jvm/jdk1.8.0/jre/lib/endorsed java.ext.dirs /usr/lib/jvm/jdk1.8.0/jre/lib/ext:/usr/java/packages/lib/ext java.home /usr/lib/jvm/jdk1.8.0/jre java.io.tmpdir /tmp java.library.path java.net.preferIPv4Stack true java.runtime.name Java(TM) SE Runtime Environment java.runtime.version 1.8.0-b132 java.specification.name Java Platform API Specification java.specification.vendor Oracle Corporation java.specification.version 1.8 java.vendor Oracle Corporation java.vendor.url http://java.oracle.com/ java.vendor.url.bug http://bugreport.sun.com/bugreport/ java.version 1.8.0 java.vm.info mixed mode java.vm.name Java HotSpot(TM) 64-Bit Server VM java.vm.specification.name Java Virtual Machine Specification java.vm.specification.vendor Oracle Corporation java.vm.specification.version 1.8 java.vm.vendor Oracle Corporation java.vm.version 25.0-b70 line.separator log4j.configuration conf/log4j.properties os.arch amd64 os.name Linux os.version 3.5.0-23-generic path.separator : sun.arch.data.model 64 sun.boot.class.path /usr/lib/jvm/jdk1.8.0/jre/lib/resources.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/rt.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/sunrsasign.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/jsse.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/jce.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/charsets.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/jfr.jar:/usr/lib/jvm/jdk1.8.0/jre/classes sun.boot.library.path /usr/lib/jvm/jdk1.8.0/jre/lib/amd64 sun.cpu.endian little sun.cpu.isalist sun.io.serialization.extendedDebugInfo true sun.io.unicode.encoding UnicodeLittle sun.java.command org.apache.spark.streaming.examples.StatefulNetworkWordCount spark://10.10.41.19:7077 localhost 9999 sun.java.launcher SUN_STANDARD sun.jnu.encoding ANSI_X3.4-1968 sun.management.compiler HotSpot 64-Bit Tiered Compilers sun.nio.ch.bugLevel sun.os.patch.level unknown user.country US user.dir /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2 user.home /home/pmogren user.language en user.name pmogren user.timezone America/New_York Classpath Entries Resource Source /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar System Classpath /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/conf System Classpath /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar System Classpath http://10.10.41.67:40368/jars/spark-examples_2.10-assembly-0.9.0-incubating.jar Added By User From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Monday, April 07, 2014 7:54 PM To: user@spark.apache.org Subject: Re: CheckpointRDD has different number of partitions than original RDD Few things that would be helpful. 1. Environment settings - you can find them on the environment tab in the Spark application UI 2. Are you setting the HDFS configuration correctly in your Spark program? For example, can you write a HDFS file from a Spark program (say spark-shell) to your HDFS installation and read it back into Spark (i.e., create a RDD)? You can test this by write an RDD as a text file from the shell, and then try to read it back from another shell. 3. If that works, then lets try explicitly checkpointing an RDD. To do this you can take any RDD and do the following. myRDD.checkpoint() myRDD.count() If there is some issue, then this should reproduce the above error. TD On Mon, Apr 7, 2014 at 3:48 PM, Paul Mogren <pmog...@commercehub.com> wrote: Hello, Spark community! My name is Paul. I am a Spark newbie, evaluating version 0.9.0 without any Hadoop at all, and need some help. I run into the following error with the StatefulNetworkWordCount example (and similarly in my prototype app, when I use the updateStateByKey operation). I get this when running against my small cluster, but not (so far) against local[2]. 61904 [spark-akka.actor.default-dispatcher-2] ERROR org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming job 1396905956000 ms.0 org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[310] at take at DStream.scala:586(0) has different number of partitions than original RDD MapPartitionsRDD[309] at mapPartitions at StateDStream.scala:66(2) at org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:99) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:989) at org.apache.spark.SparkContext.runJob(SparkContext.scala:855) at org.apache.spark.SparkContext.runJob(SparkContext.scala:870) at org.apache.spark.SparkContext.runJob(SparkContext.scala:884) at org.apache.spark.rdd.RDD.take(RDD.scala:844) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:586) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:585) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:744) Please let me know what other information would be helpful; I didn't find any question submission guidelines. Thanks, Paul