Hey Andrey, `ConstantInputDStream` doesn't support checkpoint as it contains an RDD field. It cannot resume from checkpoints.
On Mon, Jan 25, 2016 at 1:12 PM, Andrey Yegorov <andrey.yego...@gmail.com> wrote: > Hi, > > I am new to spark (and scala) and hope someone can help me with the issue > I got stuck on in my experiments/learning. > > mapWithState from spark 1.6 seems to be a great way for the task I want to > implement with spark but unfortunately I am getting error "RDD > transformations and actions can only be invoked by the driver, not inside > of other transformations" on job restart when checkpoint already exists. > Job starts and works ok if checkpoint is empty (this kind of defeats the > point of having the checkpoint). > > I can reproduce it with ~65 lines of test code, see below. > Is there something that I am doing wrong there? > > code: > ---- > > import org.apache.spark.SparkConf > import org.apache.spark.streaming.dstream.ConstantInputDStream > import org.apache.spark.streaming.{Durations, StreamingContext, _} > > object TestJob { > def stateFunc(id: String, > txt: Option[Iterable[String]], > state: State[String]) : (String, Long) = { > if (txt.nonEmpty) { > val aggregatedString = state.getOption().getOrElse("") + txt > state.update(aggregatedString) > (id, aggregatedString.length) > } else { // happens when state is timing out? any other cases? > (id, 0) > } > } > > def createContext(checkpointDirectory: String): StreamingContext = { > val sparkConf = new > SparkConf().setMaster("local[2]").setAppName("test") > > val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) > ssc.checkpoint(checkpointDirectory) > > val input = Seq("1", "21", "321", "41", "42", "543", "67") > val inputRdd = ssc.sparkContext.parallelize(input) > val testStream = new ConstantInputDStream(ssc, inputRdd) > > val streamWithIds = testStream.map(x => (x.substring(0,1), x)) > val batched = streamWithIds.groupByKey() > > val stateSpec = StateSpec.function(stateFunc > _).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days > > val result = batched.mapWithState(stateSpec) > result.print > ssc > } > > def main(args: Array[String]): Unit = { > val checkpointDirectory = com.google.common.io.Files.createTempDir() > checkpointDirectory.deleteOnExit() > val checkpointDirectoryName = checkpointDirectory.getAbsolutePath > > val ssc = StreamingContext.getOrCreate(checkpointDirectoryName, > () => { > createContext(checkpointDirectoryName) > }) > > ssc.start() > ssc.awaitTerminationOrTimeout(7000) > ssc.stop() > Thread.sleep(5000) > > val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName, > () => { > createContext(checkpointDirectoryName) > }) > > // terminates here with > // Exception in thread "main" org.apache.spark.SparkException: RDD > transformations and actions can only be invoked by the driver, not inside > of other transformations; for example, rdd1.map(x => rdd2.values.count() * > x) is invalid because the values transformation and count action cannot be > performed inside of the rdd1.map transformation. For more information, see > SPARK-5063. > ssc2.start() > ssc2.awaitTerminationOrTimeout(7000) > ssc2.stop() > } > } > > ---------- > Andrey Yegorov >