Re: kafka + Spark Streaming with checkPointing fails to restart
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Thanks everyone, that was the problem. the "create new streaming context" function was supposed to setup the stream processing as well as the checkpoint directory. I had missed the whole process of checkpoint setup. With that done, everything works as expected. For the benefit of others, my final version of the code that works looks like this and it works correctly: object RawLogProcessor extends Logging { import TacomaHelper._ val checkpointDir = "/tmp/checkpointDir_tacoma" var ssc: Option[StreamingContext] = None def createSparkConf(config: Config): SparkConf = { val sparkConf = new SparkConf() config.entrySet.asScala .map(kv => kv.getKey -> kv.getValue) .foreach { case (k, v) => sparkConf.set(s"spark.$k", unquote(v.render())) } sparkConf.registerKryoClasses(Array(classOf[VideoView], classOf[RawLog], classOf[VideoEngagement], classOf[VideoImpression])) sparkConf } // a function that returns a function of type: `() => StreamingContext ` def createContext(sparkConfig: Config, kafkaConf: Config)(f: StreamingContext => StreamingContext) = () => { val batchDurationSecs = sparkConfig.getDuration("streaming.batch_duration", TimeUnit.SECONDS) val sparkConf = createSparkConf(sparkConfig) // calculate sparkContext and streamingContext val streamingContext = new StreamingContext(sparkConf, Durations.seconds(batchDurationSecs)) streamingContext.checkpoint(checkpointDir) // apply the streaming context function to the function f(streamingContext) } def createNewContext(sparkConf: Config, kafkaConf: Config, f: StreamingContext => StreamingContext) = { logInfo("Create new Spark streamingContext with provided pipeline function") StreamingContext.getOrCreate( checkpointPath = checkpointDir, creatingFunc = createContext(sparkConf, kafkaConf)(f), createOnError = true) } def apply(sparkConfig: Config, kafkaConf: Config): StreamingContext = { rawlogTopic = kafkaConf.getString("rawlog.topic") kafkaParams = kafkaConf.entrySet.asScala .map(kv => kv.getKey -> unquote(kv.getValue.render())) .toMap if (ssc.isEmpty) { ssc = Some(createNewContext(sparkConfig, kafkaConf, setupPipeline) ) } ssc.get } var rawlogTopic: String = "qa-rawlog" var kafkaParams: Map[String, String] = Map() def setupPipeline(streamingContext: StreamingContext): StreamingContext = { logInfo("Creating new kafka rawlog stream") // TODO: extract this and pass it around somehow val rawlogDStream = KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](streamingContext, kafkaParams, Set(rawlogTopic)) logInfo("adding step to parse kafka stream into RawLog types (Normalizer)") val eventStream = rawlogDStream .map({ case (key, rawlogVal) => val record = rawlogVal.asInstanceOf[GenericData.Record] val rlog = RawLog.newBuilder() .setId(record.get("id").asInstanceOf[String]) .setAccount(record.get("account").asInstanceOf[String]) .setEvent(record.get("event").asInstanceOf[String]) .setTimestamp(record.get("timestamp").asInstanceOf[Long]) .setUserAgent(record.get("user_agent").asInstanceOf[String]) .setParams(record.get("params").asInstanceOf[java.util.Map[String, String]]) .build() val norm = Normalizer(rlog) (key, rlog.getEvent, norm) }) logInfo("Adding step to filter out VideoView only events and cache them") val videoViewStream = eventStream .filter(_._2 == "video_view") .filter(_._3.isDefined) .map((z) => (z._1, z._3.get)) .map((z) => (z._1, z._2.asInstanceOf[VideoView])) .cache() // repartition by account logInfo("repartition videoView by account and calculate stats") videoViewStream.map((v) => (v._2.getAccount, 1)) .filter(_._1 != null) .window(Durations.seconds(20)) .reduceByKey(_ + _) .print() // repartition by (deviceType, DeviceOS) logInfo("repartition videoView by (DeviceType, DeviceOS) and calculate stats") videoViewStream.map((v) => ((v._2.getDeviceType, v._2.getDeviceOs), 1)) .reduceByKeyAndWindow(_ + _, Durations.seconds(10)) .print() streamingContext } } - - Ankur On 13/05/2015 23:52, NB wrote: > The data pipeline (DAG) should not be added to the StreamingContext > in the case of a recovery scenario. The pipeline metadata is > recovered from the checkpoint folder. That is one thing you will > need to fix in your code. Also, I don't think the > ssc.checkpoint(folder) call should be made in case of the > recovery. > > The idiom to follow is to set up the DAG in the creatingFunc and > not outside of it. This will ensure that if a new context is being > created i.e. checkpoint folder does not exist, the DAG will get > added to it and then checkpointed. Once a recovery happens, this > function
Re: kafka + Spark Streaming with checkPointing fails to restart
The data pipeline (DAG) should not be added to the StreamingContext in the case of a recovery scenario. The pipeline metadata is recovered from the checkpoint folder. That is one thing you will need to fix in your code. Also, I don't think the ssc.checkpoint(folder) call should be made in case of the recovery. The idiom to follow is to set up the DAG in the creatingFunc and not outside of it. This will ensure that if a new context is being created i.e. checkpoint folder does not exist, the DAG will get added to it and then checkpointed. Once a recovery happens, this function is not invoked but everything is recreated from the checkpointed data. Hope this helps, NB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/kafka-Spark-Streaming-with-checkPointing-fails-to-restart-tp22864p22878.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: kafka + Spark Streaming with checkPointing fails to restart
cala) > > The relavant source is: > > class RawLogProcessor(ssc: StreamingContext, topic: String, > kafkaParams: Map[String, String]) { > // create kafka stream > val rawlogDStream = KafkaUtils.createDirectStream[String, Object, > StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic)) > //KafkaUtils.createStream[String, Object, StringDecoder, > KafkaAvroDecoder](ssc, kafkaParams, Map("qa-rawlogs" -> 10), > StorageLevel.MEMORY_AND_DISK_2) > > val eventStream = rawlogDStream >.map({ > case (key, rawlogVal) => >val record = rawlogVal.asInstanceOf[GenericData.Record] >val rlog = RawLog.newBuilder() > .setId(record.get("id").asInstanceOf[String]) > .setAccount(record.get("account").asInstanceOf[String]) > .setEvent(record.get("event").asInstanceOf[String]) > .setTimestamp(record.get("timestamp").asInstanceOf[Long]) > .setUserAgent(record.get("user_agent").asInstanceOf[String]) > > .setParams(record.get("params").asInstanceOf[java.util.Map[String, > String]]) > .build() >val norm = Normalizer(rlog) >(key, rlog.getEvent, norm) >}) > > val videoViewStream = eventStream >.filter(_._2 == "video_view") >.filter(_._3.isDefined) >.map((z) => (z._1, z._3.get)) >.map((z) => (z._1, z._2.asInstanceOf[VideoView])) >.cache() > > // repartition by (deviceType, DeviceOS) > val deviceTypeVideoViews = videoViewStream.map((v) => > ((v._2.getDeviceType, v._2.getDeviceOs), 1)) >.reduceByKeyAndWindow(_ + _, Durations.seconds(10)) >.print() > } > > object RawLogProcessor extends Logging { > > /** > * If str is surrounded by quotes it return the content between the > quotes > */ > def unquote(str: String) = { >if (str != null && str.length >= 2 && str.charAt(0) == '\"' && > str.charAt(str.length - 1) == '\"') > str.substring(1, str.length - 1) >else > str > } > > val checkpointDir = "/tmp/checkpointDir_tacoma" > var sparkConfig: Config = _ > var ssc: StreamingContext = _ > var processor: Option[RawLogProcessor] = None > > val createContext: () => StreamingContext = () => { >val batchDurationSecs = > sparkConfig.getDuration("streaming.batch_duration", TimeUnit.SECONDS) >val sparkConf = new SparkConf() >sparkConf.registerKryoClasses(Array(classOf[VideoView], > classOf[RawLog], classOf[VideoEngagement], classOf[VideoImpression])) >sparkConfig.entrySet.asScala > .map(kv => kv.getKey -> kv.getValue) > .foreach { >case (k, v) => > val value = unquote(v.render()) > > logInfo(s"spark.$k = $value") > > sparkConf.set(s"spark.$k", value) > } > >// calculate sparkContext and streamingContext >new StreamingContext(sparkConf, Durations.seconds(batchDurationSecs)) > } > > def createProcessor(sparkConf: Config, kafkaConf: Config): > RawLogProcessor = { >sparkConfig = sparkConf >ssc = StreamingContext.getOrCreate(checkpointPath = checkpointDir, > creatingFunc = createContext, createOnError = true) >ssc.checkpoint(checkpointDir) >// kafkaProperties >val kafkaParams = kafkaConf.entrySet.asScala > .map(kv => kv.getKey -> unquote(kv.getValue.render())) > .toMap > >logInfo(s"Initializing kafkaParams = $kafkaParams") >// create processor > new RawLogProcessor(ssc, kafkaConf.getString("rawlog.topic"), > kafkaParams) > } > > def apply(sparkConfig: Config, kafkaConf: Config) = { >if (processor.isEmpty) { > processor = Some(createProcessor(sparkConfig, kafkaConf)) >} >processor.get > } > > def start() = { >ssc.start() >ssc.awaitTermination() > } > > } > > Extended logs: > https://gist.githubusercontent.com/ankurcha/f35df63f0d8a99da0be4/raw/ec9 > 6b932540ac87577e4ce8385d26699c1a7d05e/spark-console.log > > Could someone tell me what it causes this problem? I tried looking at > the stacktrace but I am not very familiar with the codebase to make > solid assertions. > Any ideas as to what may be happening here. > > --- Ankur Chauhan > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/kafka-Spark-Streaming-with-checkPointing-fails-to-restart-tp22864.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
kafka + Spark Streaming with checkPointing fails to restart
tanceOf[java.util.Map[String, String]]) .build() val norm = Normalizer(rlog) (key, rlog.getEvent, norm) }) val videoViewStream = eventStream .filter(_._2 == "video_view") .filter(_._3.isDefined) .map((z) => (z._1, z._3.get)) .map((z) => (z._1, z._2.asInstanceOf[VideoView])) .cache() // repartition by (deviceType, DeviceOS) val deviceTypeVideoViews = videoViewStream.map((v) => ((v._2.getDeviceType, v._2.getDeviceOs), 1)) .reduceByKeyAndWindow(_ + _, Durations.seconds(10)) .print() } object RawLogProcessor extends Logging { /** * If str is surrounded by quotes it return the content between the quotes */ def unquote(str: String) = { if (str != null && str.length >= 2 && str.charAt(0) == '\"' && str.charAt(str.length - 1) == '\"') str.substring(1, str.length - 1) else str } val checkpointDir = "/tmp/checkpointDir_tacoma" var sparkConfig: Config = _ var ssc: StreamingContext = _ var processor: Option[RawLogProcessor] = None val createContext: () => StreamingContext = () => { val batchDurationSecs = sparkConfig.getDuration("streaming.batch_duration", TimeUnit.SECONDS) val sparkConf = new SparkConf() sparkConf.registerKryoClasses(Array(classOf[VideoView], classOf[RawLog], classOf[VideoEngagement], classOf[VideoImpression])) sparkConfig.entrySet.asScala .map(kv => kv.getKey -> kv.getValue) .foreach { case (k, v) => val value = unquote(v.render()) logInfo(s"spark.$k = $value") sparkConf.set(s"spark.$k", value) } // calculate sparkContext and streamingContext new StreamingContext(sparkConf, Durations.seconds(batchDurationSecs)) } def createProcessor(sparkConf: Config, kafkaConf: Config): RawLogProcessor = { sparkConfig = sparkConf ssc = StreamingContext.getOrCreate(checkpointPath = checkpointDir, creatingFunc = createContext, createOnError = true) ssc.checkpoint(checkpointDir) // kafkaProperties val kafkaParams = kafkaConf.entrySet.asScala .map(kv => kv.getKey -> unquote(kv.getValue.render())) .toMap logInfo(s"Initializing kafkaParams = $kafkaParams") // create processor new RawLogProcessor(ssc, kafkaConf.getString("rawlog.topic"), kafkaParams) } def apply(sparkConfig: Config, kafkaConf: Config) = { if (processor.isEmpty) { processor = Some(createProcessor(sparkConfig, kafkaConf)) } processor.get } def start() = { ssc.start() ssc.awaitTermination() } } Extended logs: https://gist.githubusercontent.com/ankurcha/f35df63f0d8a99da0be4/raw/ec9 6b932540ac87577e4ce8385d26699c1a7d05e/spark-console.log Could someone tell me what it causes this problem? I tried looking at the stacktrace but I am not very familiar with the codebase to make solid assertions. Any ideas as to what may be happening here. --- Ankur Chauhan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/kafka-Spark-Streaming-with-checkPointing-fails-to-restart-tp22864.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
kafka + Spark Streaming with checkPointing fails to restart
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi, I have a simple application which fails with the following exception only when the application is restarted (i.e. the checkpointDir has entires from a previous execution): Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266 ) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply (DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply (DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28 4) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt ream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc ala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc ala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik e.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik e.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca la:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251 ) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala: 116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app ly(JobGenerator.scala:227) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app ly(JobGenerator.scala:222) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s cala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s cala:222) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca la:90) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca la:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala :512) at com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s cala:115) at com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1 5) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableF orwarder.scala:32) at scala.App$class.main(App.scala:71) at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5) at com.brightcove.analytics.tacoma.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav a:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor Impl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit $$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) The relavant source is: class RawLogProcessor(ssc: StreamingContext, topic: String, kafkaParams: Map[String, String]) { // create kafka stream val rawlogDStream = KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic)) //KafkaUtils.createStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Map("qa-rawlogs" -> 10), StorageLevel.MEMORY_AND_DISK_2) val eventStream = rawlogDStream .map({ case (key, rawlogVal) => val record = rawlogVal.asInstanceOf[GenericData.Record] val rlog = RawLog.newBuilder() .setId(record.get("id").asInstanceOf[String]) .setAccount(record.get("account").asInstanceOf[String]) .setEvent(record.get("event").asInstanceOf[String]) .setTimestamp(record.get("timestamp").asInstanceOf[Long]) .setUserAgent(record.get("user_agent").asInstanceOf[String]) .setParams(record.get("params").asInstanceOf[java.util.Map[String, String]])
kafka + Spark Streaming with checkPointing fails to restart
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi, I have a simple application which fails with the following exception only when the application is restarted (i.e. the checkpointDir has entires from a previous execution): Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266 ) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply (DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply (DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:28 4) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDSt ream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc ala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sc ala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik e.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLik e.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca la:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251 ) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala: 116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app ly(JobGenerator.scala:227) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.app ly(JobGenerator.scala:222) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s cala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.s cala:222) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.sca la:90) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.sca la:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala :512) at com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.s cala:115) at com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:1 5) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableF orwarder.scala:32) at scala.App$class.main(App.scala:71) at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5) at com.brightcove.analytics.tacoma.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav a:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor Impl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit $$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) The relavant source is: class RawLogProcessor(ssc: StreamingContext, topic: String, kafkaParams: Map[String, String]) { // create kafka stream val rawlogDStream = KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic)) //KafkaUtils.createStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Map("qa-rawlogs" -> 10), StorageLevel.MEMORY_AND_DISK_2) val eventStream = rawlogDStream .map({ case (key, rawlogVal) => val record = rawlogVal.asInstanceOf[GenericData.Record] val rlog = RawLog.newBuilder() .setId(record.get("id").asInstanceOf[String]) .setAccount(record.get("account").asInstanceOf[String]) .setEvent(record.get("event").asInstanceOf[String]) .setTimestamp(record.get("timestamp").asInstanceOf[Long]) .setUserAgent(record.get("user_agent").asInstanceOf[String]) .setParams(record.get("params").asInstanceOf[java.util.Map[String, String]])