Hi Cody, Thanks for your answer.
I had already tried to change the spark submit parameters, but I double checked to reply your answer. Even changing properties file or directly on the spark-submit arguments, none of them work when the application runs from the checkpoint. It seems that everything is cached. I changed driver memory, executor memory, executor cores and number of executors. So, the scenario I have today is: once the Spark Streaming application retrieves the data from the checkpoint, I can't change the submission parameters neither the code parameters without remove the checkpoint folder, loosing all the data used by windowed functions. I was wondering what kind of parameters are you guys loading from the configuration file, when using checkpoints. I really appreciate all the help on this. Many thanks, Ricardo On Fri, Sep 11, 2015 at 11:09 AM, Cody Koeninger <c...@koeninger.org> wrote: > Yeah, it makes sense that parameters that are read only during your > getOrCCreate function wouldn't be re-read, since that function isn't called > if a checkpoint is loaded. > > I would have thought changing number of executors and other things used by > spark-submit would work on checkpoint restart. Have you tried both > changing them in the properties file provided to spark submit, and the > --arguments that correspond to number of cores / executor memory? > > On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva < > ricardo.pa...@corp.globo.com> wrote: > >> >> Hi guys, >> >> I tried to use the configuration file, but it didn't work as I expected. >> As part of the Spark Streaming flow, my methods run only when the >> application is started the first time. Once I restart the app, it reads >> from the checkpoint and all the dstream operations come from the cache. No >> parameter is reloaded. >> >> I would like to know if it's possible to reset the time of windowed >> operations, checkpoint time etc. I also would like to change the submission >> parameters, like number of executors, memory per executor or driver etc. If >> it's not possible, what kind of parameters do you guys usually use in a >> configuration file. I know that the streaming interval it not possible to >> be changed. >> >> This is my code: >> >> def main(args: Array[String]): Unit = { >> val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER, >> createSparkContext _) >> ssc.start() >> ssc.awaitTermination() >> ssc.stop() >> } >> >> def createSparkContext(): StreamingContext = { >> val sparkConf = new SparkConf() >> .setAppName(APP_NAME) >> .set("spark.streaming.unpersist", "true") >> val ssc = new StreamingContext(sparkConf, streamingInterval) >> ssc.checkpoint(CHECKPOINT_FOLDER) >> ssc.sparkContext.addFile(CONFIG_FILENAME) >> >> val rawStream = createKafkaRDD(ssc) >> processAndSave(rawStream) >> return ssc >> } >> >> def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = { >> >> val configFile = SparkFiles.get("config.properties") >> val config:Config = ConfigFactory.parseFile(new File(configFile)) >> >> >> * slidingInterval = >> Minutes(config.getInt("streaming.sliding.interval")) windowLength = >> Minutes(config.getInt("streaming.window.interval")) minPageview = >> config.getInt("streaming.pageview.min")* >> >> >> val pageviewStream = rawStream.map{ case (_, raw) => >> (PageViewParser.parseURL(raw), 1L) } >> val pageviewsHourlyCount = >> stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum >> _, >> >> PageViewAgregator.pageviewsMinus _, >> *windowLength*, >> >> *slidingInterval*) >> >> val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >= >> *minPageview*) >> permalinkAudienceStream.map(a => s"${a._1}\t${a._2}") >> .repartition(1) >> .saveAsTextFiles(DESTINATION_FILE, "txt") >> >> } >> >> I really appreciate any help on this. >> >> Many thanks, >> >> Ricardo >> >> On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva < >> ricardo.pa...@corp.globo.com> wrote: >> >>> Good tip. I will try that. >>> >>> Thank you. >>> >>> On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> Yeah, in general if you're changing the jar you can't recover the >>>> checkpoint. >>>> >>>> If you're just changing parameters, why not externalize those in a >>>> configuration file so your jar doesn't change? I tend to stick even my >>>> app-specific parameters in an external spark config so everything is in one >>>> place. >>>> >>>> On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva < >>>> ricardo.pa...@corp.globo.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> Is there a way to submit an app code change, keeping the checkpoint >>>>> data or do I need to erase the checkpoint folder every time I re-submit >>>>> the >>>>> spark app with a new jar? >>>>> >>>>> I have an app that count pageviews streaming from Kafka, and deliver a >>>>> file every hour from the past 24 hours. I'm using reduceByKeyAndWindow >>>>> with >>>>> the reduce and inverse functions set. >>>>> >>>>> I'm doing some code improvements and would like to keep the data from >>>>> the past hours, so when I re-submit a code change, I would keep delivering >>>>> the pageviews aggregation without need to wait for 24 hours of new data. >>>>> Sometimes I'm just changing the submission parameters, like number of >>>>> executors, memory and cores. >>>>> >>>>> Many thanks, >>>>> >>>>> Ricardo >>>>> >>>>> -- >>>>> Ricardo Paiva >>>>> Big Data / Semântica >>>>> *globo.com* <http://www.globo.com> >>>>> >>>> >>>> >>> >>> >>> -- >>> Ricardo Paiva >>> Big Data / Semântica >>> *globo.com* <http://www.globo.com> >>> >> >> >> >> -- >> Ricardo Paiva >> Big Data / Semântica >> *globo.com* <http://www.globo.com> >> > > -- Ricardo Paiva Big Data / Semântica 2483-6432 *globo.com* <http://www.globo.com>