Yeah, looks like you're right about being unable to change those.  Upon
further reading, even though StreamingContext.getOrCreate makes an entirely
new spark conf, Checkpoint will only reload certain properties.

I'm not sure if it'd be safe to include memory / cores among those
properties that get re-loaded, TD would be a better person to ask.

On Mon, Sep 14, 2015 at 2:54 PM, Ricardo Paiva <ricardo.pa...@corp.globo.com
> wrote:

> Hi Cody,
>
> Thanks for your answer.
>
> I had already tried to change the spark submit parameters, but I double
> checked to reply your answer. Even changing properties file or directly on
> the spark-submit arguments, none of them work when the application runs
> from the checkpoint. It seems that everything is cached. I changed driver
> memory, executor memory, executor cores and number of executors.
>
> So, the scenario I have today is: once the Spark Streaming application
> retrieves the data from the checkpoint, I can't change the submission
> parameters neither the code parameters without remove the checkpoint
> folder, loosing all the data used by windowed functions. I was wondering
> what kind of parameters are you guys loading from the configuration file,
> when using checkpoints.
>
> I really appreciate all the help on this.
>
> Many thanks,
>
> Ricardo
>
>
>
>
>
>
>
>
> On Fri, Sep 11, 2015 at 11:09 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Yeah, it makes sense that parameters that are read only during your
>> getOrCCreate function wouldn't be re-read, since that function isn't called
>> if a checkpoint is loaded.
>>
>> I would have thought changing number of executors and other things used
>> by spark-submit would work on checkpoint restart.  Have you tried both
>> changing them in the properties file provided to spark submit, and the
>> --arguments that correspond to number of cores / executor memory?
>>
>> On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva <
>> ricardo.pa...@corp.globo.com> wrote:
>>
>>>
>>> Hi guys,
>>>
>>> I tried to use the configuration file, but it didn't work as I expected.
>>> As part of the Spark Streaming flow, my methods run only when the
>>> application is started the first time. Once I restart the app, it reads
>>> from the checkpoint and all the dstream operations come from the cache. No
>>> parameter is reloaded.
>>>
>>> I would like to know if it's possible to reset the time of windowed
>>> operations, checkpoint time etc. I also would like to change the submission
>>> parameters, like number of executors, memory per executor or driver etc. If
>>> it's not possible, what kind of parameters do you guys usually use in a
>>> configuration file. I know that the streaming interval it not possible to
>>> be changed.
>>>
>>> This is my code:
>>>
>>> def main(args: Array[String]): Unit = {
>>>   val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER,
>>> createSparkContext _)
>>>   ssc.start()
>>>   ssc.awaitTermination()
>>>   ssc.stop()
>>> }
>>>
>>> def createSparkContext(): StreamingContext = {
>>>   val sparkConf = new SparkConf()
>>>      .setAppName(APP_NAME)
>>>      .set("spark.streaming.unpersist", "true")
>>>   val ssc = new StreamingContext(sparkConf, streamingInterval)
>>>   ssc.checkpoint(CHECKPOINT_FOLDER)
>>>   ssc.sparkContext.addFile(CONFIG_FILENAME)
>>>
>>>   val rawStream = createKafkaRDD(ssc)
>>>   processAndSave(rawStream)
>>>   return ssc
>>> }
>>>
>>> def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = {
>>>
>>>   val configFile = SparkFiles.get("config.properties")
>>>   val config:Config = ConfigFactory.parseFile(new File(configFile))
>>>
>>>
>>> *  slidingInterval =
>>> Minutes(config.getInt("streaming.sliding.interval"))  windowLength =
>>> Minutes(config.getInt("streaming.window.interval"))  minPageview =
>>> config.getInt("streaming.pageview.min")*
>>>
>>>
>>>   val pageviewStream = rawStream.map{ case (_, raw) =>
>>> (PageViewParser.parseURL(raw), 1L) }
>>>   val pageviewsHourlyCount = 
>>> stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum
>>> _,
>>>
>>> PageViewAgregator.pageviewsMinus _,
>>>                                                          *windowLength*,
>>>
>>>
>>> *slidingInterval*)
>>>
>>>   val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >=
>>> *minPageview*)
>>>   permalinkAudienceStream.map(a => s"${a._1}\t${a._2}")
>>>                              .repartition(1)
>>>                              .saveAsTextFiles(DESTINATION_FILE, "txt")
>>>
>>> }
>>>
>>> I really appreciate any help on this.
>>>
>>> Many thanks,
>>>
>>> Ricardo
>>>
>>> On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva <
>>> ricardo.pa...@corp.globo.com> wrote:
>>>
>>>> Good tip. I will try that.
>>>>
>>>> Thank you.
>>>>
>>>> On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Yeah, in general if you're changing the jar you can't recover the
>>>>> checkpoint.
>>>>>
>>>>> If you're just changing parameters, why not externalize those in a
>>>>> configuration file so your jar doesn't change?  I tend to stick even my
>>>>> app-specific parameters in an external spark config so everything is in 
>>>>> one
>>>>> place.
>>>>>
>>>>> On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva <
>>>>> ricardo.pa...@corp.globo.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Is there a way to submit an app code change, keeping the checkpoint
>>>>>> data or do I need to erase the checkpoint folder every time I re-submit 
>>>>>> the
>>>>>> spark app with a new jar?
>>>>>>
>>>>>> I have an app that count pageviews streaming from Kafka, and deliver
>>>>>> a file every hour from the past 24 hours. I'm using reduceByKeyAndWindow
>>>>>> with the reduce and inverse functions set.
>>>>>>
>>>>>> I'm doing some code improvements and would like to keep the data from
>>>>>> the past hours, so when I re-submit a code change, I would keep 
>>>>>> delivering
>>>>>> the pageviews aggregation without need to wait for 24 hours of new data.
>>>>>> Sometimes I'm just changing the submission parameters, like number of
>>>>>> executors, memory and cores.
>>>>>>
>>>>>> Many thanks,
>>>>>>
>>>>>> Ricardo
>>>>>>
>>>>>> --
>>>>>> Ricardo Paiva
>>>>>> Big Data / Semântica
>>>>>> *globo.com* <http://www.globo.com>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Ricardo Paiva
>>>> Big Data / Semântica
>>>> *globo.com* <http://www.globo.com>
>>>>
>>>
>>>
>>>
>>> --
>>> Ricardo Paiva
>>> Big Data / Semântica
>>> *globo.com* <http://www.globo.com>
>>>
>>
>>
>
>
> --
> Ricardo Paiva
> Big Data / Semântica
> 2483-6432
> *globo.com* <http://www.globo.com>
>

Reply via email to