Re: mapWithState and context start when checkpoint exists
Thank you! what would be the best alternative to simulate a stream for testing purposes from e.g. sequence or a text file? In production I'll use kafka as a source but locally I wanted to mock it. Worst case scenario I'll have setup/tear down kafka cluster in tests but I think having a mock will be faster. -- Andrey Yegorov On Mon, Jan 25, 2016 at 1:26 PM, Shixiong(Ryan) Zhu wrote: > Hey Andrey, > > `ConstantInputDStream` doesn't support checkpoint as it contains an RDD > field. It cannot resume from checkpoints. > > On Mon, Jan 25, 2016 at 1:12 PM, Andrey Yegorov > wrote: > >> Hi, >> >> I am new to spark (and scala) and hope someone can help me with the issue >> I got stuck on in my experiments/learning. >> >> mapWithState from spark 1.6 seems to be a great way for the task I want >> to implement with spark but unfortunately I am getting error "RDD >> transformations and actions can only be invoked by the driver, not inside >> of other transformations" on job restart when checkpoint already exists. >> Job starts and works ok if checkpoint is empty (this kind of defeats the >> point of having the checkpoint). >> >> I can reproduce it with ~65 lines of test code, see below. >> Is there something that I am doing wrong there? >> >> code: >> >> >> import org.apache.spark.SparkConf >> import org.apache.spark.streaming.dstream.ConstantInputDStream >> import org.apache.spark.streaming.{Durations, StreamingContext, _} >> >> object TestJob { >> def stateFunc(id: String, >> txt: Option[Iterable[String]], >> state: State[String]) : (String, Long) = { >> if (txt.nonEmpty) { >> val aggregatedString = state.getOption().getOrElse("") + txt >> state.update(aggregatedString) >> (id, aggregatedString.length) >> } else { // happens when state is timing out? any other cases? >> (id, 0) >> } >> } >> >> def createContext(checkpointDirectory: String): StreamingContext = { >> val sparkConf = new >> SparkConf().setMaster("local[2]").setAppName("test") >> >> val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) >> ssc.checkpoint(checkpointDirectory) >> >> val input = Seq("1", "21", "321", "41", "42", "543", "67") >> val inputRdd = ssc.sparkContext.parallelize(input) >> val testStream = new ConstantInputDStream(ssc, inputRdd) >> >> val streamWithIds = testStream.map(x => (x.substring(0,1), x)) >> val batched = streamWithIds.groupByKey() >> >> val stateSpec = StateSpec.function(stateFunc >> _).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days >> >> val result = batched.mapWithState(stateSpec) >> result.print >> ssc >> } >> >> def main(args: Array[String]): Unit = { >> val checkpointDirectory = com.google.common.io.Files.createTempDir() >> checkpointDirectory.deleteOnExit() >> val checkpointDirectoryName = checkpointDirectory.getAbsolutePath >> >> val ssc = StreamingContext.getOrCreate(checkpointDirectoryName, >> () => { >> createContext(checkpointDirectoryName) >> }) >> >> ssc.start() >> ssc.awaitTerminationOrTimeout(7000) >> ssc.stop() >> Thread.sleep(5000) >> >> val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName, >> () => { >> createContext(checkpointDirectoryName) >> }) >> >> // terminates here with >> // Exception in thread "main" org.apache.spark.SparkException: RDD >> transformations and actions can only be invoked by the driver, not inside >> of other transformations; for example, rdd1.map(x => rdd2.values.count() * >> x) is invalid because the values transformation and count action cannot be >> performed inside of the rdd1.map transformation. For more information, see >> SPARK-5063. >> ssc2.start() >> ssc2.awaitTerminationOrTimeout(7000) >> ssc2.stop() >> } >> } >> >> -- >> Andrey Yegorov >> > >
Re: mapWithState and context start when checkpoint exists
Hey Andrey, `ConstantInputDStream` doesn't support checkpoint as it contains an RDD field. It cannot resume from checkpoints. On Mon, Jan 25, 2016 at 1:12 PM, Andrey Yegorov wrote: > Hi, > > I am new to spark (and scala) and hope someone can help me with the issue > I got stuck on in my experiments/learning. > > mapWithState from spark 1.6 seems to be a great way for the task I want to > implement with spark but unfortunately I am getting error "RDD > transformations and actions can only be invoked by the driver, not inside > of other transformations" on job restart when checkpoint already exists. > Job starts and works ok if checkpoint is empty (this kind of defeats the > point of having the checkpoint). > > I can reproduce it with ~65 lines of test code, see below. > Is there something that I am doing wrong there? > > code: > > > import org.apache.spark.SparkConf > import org.apache.spark.streaming.dstream.ConstantInputDStream > import org.apache.spark.streaming.{Durations, StreamingContext, _} > > object TestJob { > def stateFunc(id: String, > txt: Option[Iterable[String]], > state: State[String]) : (String, Long) = { > if (txt.nonEmpty) { > val aggregatedString = state.getOption().getOrElse("") + txt > state.update(aggregatedString) > (id, aggregatedString.length) > } else { // happens when state is timing out? any other cases? > (id, 0) > } > } > > def createContext(checkpointDirectory: String): StreamingContext = { > val sparkConf = new > SparkConf().setMaster("local[2]").setAppName("test") > > val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) > ssc.checkpoint(checkpointDirectory) > > val input = Seq("1", "21", "321", "41", "42", "543", "67") > val inputRdd = ssc.sparkContext.parallelize(input) > val testStream = new ConstantInputDStream(ssc, inputRdd) > > val streamWithIds = testStream.map(x => (x.substring(0,1), x)) > val batched = streamWithIds.groupByKey() > > val stateSpec = StateSpec.function(stateFunc > _).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days > > val result = batched.mapWithState(stateSpec) > result.print > ssc > } > > def main(args: Array[String]): Unit = { > val checkpointDirectory = com.google.common.io.Files.createTempDir() > checkpointDirectory.deleteOnExit() > val checkpointDirectoryName = checkpointDirectory.getAbsolutePath > > val ssc = StreamingContext.getOrCreate(checkpointDirectoryName, > () => { > createContext(checkpointDirectoryName) > }) > > ssc.start() > ssc.awaitTerminationOrTimeout(7000) > ssc.stop() > Thread.sleep(5000) > > val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName, > () => { > createContext(checkpointDirectoryName) > }) > > // terminates here with > // Exception in thread "main" org.apache.spark.SparkException: RDD > transformations and actions can only be invoked by the driver, not inside > of other transformations; for example, rdd1.map(x => rdd2.values.count() * > x) is invalid because the values transformation and count action cannot be > performed inside of the rdd1.map transformation. For more information, see > SPARK-5063. > ssc2.start() > ssc2.awaitTerminationOrTimeout(7000) > ssc2.stop() > } > } > > -- > Andrey Yegorov >
mapWithState and context start when checkpoint exists
Hi, I am new to spark (and scala) and hope someone can help me with the issue I got stuck on in my experiments/learning. mapWithState from spark 1.6 seems to be a great way for the task I want to implement with spark but unfortunately I am getting error "RDD transformations and actions can only be invoked by the driver, not inside of other transformations" on job restart when checkpoint already exists. Job starts and works ok if checkpoint is empty (this kind of defeats the point of having the checkpoint). I can reproduce it with ~65 lines of test code, see below. Is there something that I am doing wrong there? code: import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ConstantInputDStream import org.apache.spark.streaming.{Durations, StreamingContext, _} object TestJob { def stateFunc(id: String, txt: Option[Iterable[String]], state: State[String]) : (String, Long) = { if (txt.nonEmpty) { val aggregatedString = state.getOption().getOrElse("") + txt state.update(aggregatedString) (id, aggregatedString.length) } else { // happens when state is timing out? any other cases? (id, 0) } } def createContext(checkpointDirectory: String): StreamingContext = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("test") val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) ssc.checkpoint(checkpointDirectory) val input = Seq("1", "21", "321", "41", "42", "543", "67") val inputRdd = ssc.sparkContext.parallelize(input) val testStream = new ConstantInputDStream(ssc, inputRdd) val streamWithIds = testStream.map(x => (x.substring(0,1), x)) val batched = streamWithIds.groupByKey() val stateSpec = StateSpec.function(stateFunc _).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days val result = batched.mapWithState(stateSpec) result.print ssc } def main(args: Array[String]): Unit = { val checkpointDirectory = com.google.common.io.Files.createTempDir() checkpointDirectory.deleteOnExit() val checkpointDirectoryName = checkpointDirectory.getAbsolutePath val ssc = StreamingContext.getOrCreate(checkpointDirectoryName, () => { createContext(checkpointDirectoryName) }) ssc.start() ssc.awaitTerminationOrTimeout(7000) ssc.stop() Thread.sleep(5000) val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName, () => { createContext(checkpointDirectoryName) }) // terminates here with // Exception in thread "main" org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. ssc2.start() ssc2.awaitTerminationOrTimeout(7000) ssc2.stop() } } -- Andrey Yegorov