Thanks Cody. You confirmed that I'm not doing something wrong. I will keep investigating and if I find something I let everybody know.
Thanks again. Regards, Ricardo On Mon, Sep 14, 2015 at 6:29 PM, Cody Koeninger <c...@koeninger.org> wrote: > Yeah, looks like you're right about being unable to change those. Upon > further reading, even though StreamingContext.getOrCreate makes an entirely > new spark conf, Checkpoint will only reload certain properties. > > I'm not sure if it'd be safe to include memory / cores among those > properties that get re-loaded, TD would be a better person to ask. > > On Mon, Sep 14, 2015 at 2:54 PM, Ricardo Paiva < > ricardo.pa...@corp.globo.com> wrote: > >> Hi Cody, >> >> Thanks for your answer. >> >> I had already tried to change the spark submit parameters, but I double >> checked to reply your answer. Even changing properties file or directly on >> the spark-submit arguments, none of them work when the application runs >> from the checkpoint. It seems that everything is cached. I changed driver >> memory, executor memory, executor cores and number of executors. >> >> So, the scenario I have today is: once the Spark Streaming application >> retrieves the data from the checkpoint, I can't change the submission >> parameters neither the code parameters without remove the checkpoint >> folder, loosing all the data used by windowed functions. I was wondering >> what kind of parameters are you guys loading from the configuration file, >> when using checkpoints. >> >> I really appreciate all the help on this. >> >> Many thanks, >> >> Ricardo >> >> >> >> >> >> >> >> >> On Fri, Sep 11, 2015 at 11:09 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> Yeah, it makes sense that parameters that are read only during your >>> getOrCCreate function wouldn't be re-read, since that function isn't called >>> if a checkpoint is loaded. >>> >>> I would have thought changing number of executors and other things used >>> by spark-submit would work on checkpoint restart. Have you tried both >>> changing them in the properties file provided to spark submit, and the >>> --arguments that correspond to number of cores / executor memory? >>> >>> On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva < >>> ricardo.pa...@corp.globo.com> wrote: >>> >>>> >>>> Hi guys, >>>> >>>> I tried to use the configuration file, but it didn't work as I >>>> expected. As part of the Spark Streaming flow, my methods run only when the >>>> application is started the first time. Once I restart the app, it reads >>>> from the checkpoint and all the dstream operations come from the cache. No >>>> parameter is reloaded. >>>> >>>> I would like to know if it's possible to reset the time of windowed >>>> operations, checkpoint time etc. I also would like to change the submission >>>> parameters, like number of executors, memory per executor or driver etc. If >>>> it's not possible, what kind of parameters do you guys usually use in a >>>> configuration file. I know that the streaming interval it not possible to >>>> be changed. >>>> >>>> This is my code: >>>> >>>> def main(args: Array[String]): Unit = { >>>> val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER, >>>> createSparkContext _) >>>> ssc.start() >>>> ssc.awaitTermination() >>>> ssc.stop() >>>> } >>>> >>>> def createSparkContext(): StreamingContext = { >>>> val sparkConf = new SparkConf() >>>> .setAppName(APP_NAME) >>>> .set("spark.streaming.unpersist", "true") >>>> val ssc = new StreamingContext(sparkConf, streamingInterval) >>>> ssc.checkpoint(CHECKPOINT_FOLDER) >>>> ssc.sparkContext.addFile(CONFIG_FILENAME) >>>> >>>> val rawStream = createKafkaRDD(ssc) >>>> processAndSave(rawStream) >>>> return ssc >>>> } >>>> >>>> def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = { >>>> >>>> val configFile = SparkFiles.get("config.properties") >>>> val config:Config = ConfigFactory.parseFile(new File(configFile)) >>>> >>>> >>>> * slidingInterval = >>>> Minutes(config.getInt("streaming.sliding.interval")) windowLength = >>>> Minutes(config.getInt("streaming.window.interval")) minPageview = >>>> config.getInt("streaming.pageview.min")* >>>> >>>> >>>> val pageviewStream = rawStream.map{ case (_, raw) => >>>> (PageViewParser.parseURL(raw), 1L) } >>>> val pageviewsHourlyCount = >>>> stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum >>>> _, >>>> >>>> PageViewAgregator.pageviewsMinus _, >>>> *windowLength*, >>>> >>>> >>>> *slidingInterval*) >>>> >>>> val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >= >>>> *minPageview*) >>>> permalinkAudienceStream.map(a => s"${a._1}\t${a._2}") >>>> .repartition(1) >>>> .saveAsTextFiles(DESTINATION_FILE, "txt") >>>> >>>> } >>>> >>>> I really appreciate any help on this. >>>> >>>> Many thanks, >>>> >>>> Ricardo >>>> >>>> On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva < >>>> ricardo.pa...@corp.globo.com> wrote: >>>> >>>>> Good tip. I will try that. >>>>> >>>>> Thank you. >>>>> >>>>> On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> Yeah, in general if you're changing the jar you can't recover the >>>>>> checkpoint. >>>>>> >>>>>> If you're just changing parameters, why not externalize those in a >>>>>> configuration file so your jar doesn't change? I tend to stick even my >>>>>> app-specific parameters in an external spark config so everything is in >>>>>> one >>>>>> place. >>>>>> >>>>>> On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva < >>>>>> ricardo.pa...@corp.globo.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Is there a way to submit an app code change, keeping the checkpoint >>>>>>> data or do I need to erase the checkpoint folder every time I re-submit >>>>>>> the >>>>>>> spark app with a new jar? >>>>>>> >>>>>>> I have an app that count pageviews streaming from Kafka, and deliver >>>>>>> a file every hour from the past 24 hours. I'm using reduceByKeyAndWindow >>>>>>> with the reduce and inverse functions set. >>>>>>> >>>>>>> I'm doing some code improvements and would like to keep the data >>>>>>> from the past hours, so when I re-submit a code change, I would keep >>>>>>> delivering the pageviews aggregation without need to wait for 24 hours >>>>>>> of >>>>>>> new data. Sometimes I'm just changing the submission parameters, like >>>>>>> number of executors, memory and cores. >>>>>>> >>>>>>> Many thanks, >>>>>>> >>>>>>> Ricardo >>>>>>> >>>>>>> -- >>>>>>> Ricardo Paiva >>>>>>> Big Data / Semântica >>>>>>> *globo.com* <http://www.globo.com> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Ricardo Paiva >>>>> Big Data / Semântica >>>>> *globo.com* <http://www.globo.com> >>>>> >>>> >>>> >>>> >>>> -- >>>> Ricardo Paiva >>>> Big Data / Semântica >>>> *globo.com* <http://www.globo.com> >>>> >>> >>> >> >> >> -- >> Ricardo Paiva >> Big Data / Semântica >> 2483-6432 >> *globo.com* <http://www.globo.com> >> > > -- Ricardo Paiva Big Data / Semântica 2483-6432 *globo.com* <http://www.globo.com>