Thanks Cody.

You confirmed that I'm not doing something wrong. I will keep investigating
and if I find something I let everybody know.

Thanks again.

Regards,

Ricardo

On Mon, Sep 14, 2015 at 6:29 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Yeah, looks like you're right about being unable to change those.  Upon
> further reading, even though StreamingContext.getOrCreate makes an entirely
> new spark conf, Checkpoint will only reload certain properties.
>
> I'm not sure if it'd be safe to include memory / cores among those
> properties that get re-loaded, TD would be a better person to ask.
>
> On Mon, Sep 14, 2015 at 2:54 PM, Ricardo Paiva <
> ricardo.pa...@corp.globo.com> wrote:
>
>> Hi Cody,
>>
>> Thanks for your answer.
>>
>> I had already tried to change the spark submit parameters, but I double
>> checked to reply your answer. Even changing properties file or directly on
>> the spark-submit arguments, none of them work when the application runs
>> from the checkpoint. It seems that everything is cached. I changed driver
>> memory, executor memory, executor cores and number of executors.
>>
>> So, the scenario I have today is: once the Spark Streaming application
>> retrieves the data from the checkpoint, I can't change the submission
>> parameters neither the code parameters without remove the checkpoint
>> folder, loosing all the data used by windowed functions. I was wondering
>> what kind of parameters are you guys loading from the configuration file,
>> when using checkpoints.
>>
>> I really appreciate all the help on this.
>>
>> Many thanks,
>>
>> Ricardo
>>
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Sep 11, 2015 at 11:09 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Yeah, it makes sense that parameters that are read only during your
>>> getOrCCreate function wouldn't be re-read, since that function isn't called
>>> if a checkpoint is loaded.
>>>
>>> I would have thought changing number of executors and other things used
>>> by spark-submit would work on checkpoint restart.  Have you tried both
>>> changing them in the properties file provided to spark submit, and the
>>> --arguments that correspond to number of cores / executor memory?
>>>
>>> On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva <
>>> ricardo.pa...@corp.globo.com> wrote:
>>>
>>>>
>>>> Hi guys,
>>>>
>>>> I tried to use the configuration file, but it didn't work as I
>>>> expected. As part of the Spark Streaming flow, my methods run only when the
>>>> application is started the first time. Once I restart the app, it reads
>>>> from the checkpoint and all the dstream operations come from the cache. No
>>>> parameter is reloaded.
>>>>
>>>> I would like to know if it's possible to reset the time of windowed
>>>> operations, checkpoint time etc. I also would like to change the submission
>>>> parameters, like number of executors, memory per executor or driver etc. If
>>>> it's not possible, what kind of parameters do you guys usually use in a
>>>> configuration file. I know that the streaming interval it not possible to
>>>> be changed.
>>>>
>>>> This is my code:
>>>>
>>>> def main(args: Array[String]): Unit = {
>>>>   val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER,
>>>> createSparkContext _)
>>>>   ssc.start()
>>>>   ssc.awaitTermination()
>>>>   ssc.stop()
>>>> }
>>>>
>>>> def createSparkContext(): StreamingContext = {
>>>>   val sparkConf = new SparkConf()
>>>>      .setAppName(APP_NAME)
>>>>      .set("spark.streaming.unpersist", "true")
>>>>   val ssc = new StreamingContext(sparkConf, streamingInterval)
>>>>   ssc.checkpoint(CHECKPOINT_FOLDER)
>>>>   ssc.sparkContext.addFile(CONFIG_FILENAME)
>>>>
>>>>   val rawStream = createKafkaRDD(ssc)
>>>>   processAndSave(rawStream)
>>>>   return ssc
>>>> }
>>>>
>>>> def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = {
>>>>
>>>>   val configFile = SparkFiles.get("config.properties")
>>>>   val config:Config = ConfigFactory.parseFile(new File(configFile))
>>>>
>>>>
>>>> *  slidingInterval =
>>>> Minutes(config.getInt("streaming.sliding.interval"))  windowLength =
>>>> Minutes(config.getInt("streaming.window.interval"))  minPageview =
>>>> config.getInt("streaming.pageview.min")*
>>>>
>>>>
>>>>   val pageviewStream = rawStream.map{ case (_, raw) =>
>>>> (PageViewParser.parseURL(raw), 1L) }
>>>>   val pageviewsHourlyCount = 
>>>> stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum
>>>> _,
>>>>
>>>> PageViewAgregator.pageviewsMinus _,
>>>>                                                          *windowLength*,
>>>>
>>>>
>>>> *slidingInterval*)
>>>>
>>>>   val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >=
>>>> *minPageview*)
>>>>   permalinkAudienceStream.map(a => s"${a._1}\t${a._2}")
>>>>                              .repartition(1)
>>>>                              .saveAsTextFiles(DESTINATION_FILE, "txt")
>>>>
>>>> }
>>>>
>>>> I really appreciate any help on this.
>>>>
>>>> Many thanks,
>>>>
>>>> Ricardo
>>>>
>>>> On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva <
>>>> ricardo.pa...@corp.globo.com> wrote:
>>>>
>>>>> Good tip. I will try that.
>>>>>
>>>>> Thank you.
>>>>>
>>>>> On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Yeah, in general if you're changing the jar you can't recover the
>>>>>> checkpoint.
>>>>>>
>>>>>> If you're just changing parameters, why not externalize those in a
>>>>>> configuration file so your jar doesn't change?  I tend to stick even my
>>>>>> app-specific parameters in an external spark config so everything is in 
>>>>>> one
>>>>>> place.
>>>>>>
>>>>>> On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva <
>>>>>> ricardo.pa...@corp.globo.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Is there a way to submit an app code change, keeping the checkpoint
>>>>>>> data or do I need to erase the checkpoint folder every time I re-submit 
>>>>>>> the
>>>>>>> spark app with a new jar?
>>>>>>>
>>>>>>> I have an app that count pageviews streaming from Kafka, and deliver
>>>>>>> a file every hour from the past 24 hours. I'm using reduceByKeyAndWindow
>>>>>>> with the reduce and inverse functions set.
>>>>>>>
>>>>>>> I'm doing some code improvements and would like to keep the data
>>>>>>> from the past hours, so when I re-submit a code change, I would keep
>>>>>>> delivering the pageviews aggregation without need to wait for 24 hours 
>>>>>>> of
>>>>>>> new data. Sometimes I'm just changing the submission parameters, like
>>>>>>> number of executors, memory and cores.
>>>>>>>
>>>>>>> Many thanks,
>>>>>>>
>>>>>>> Ricardo
>>>>>>>
>>>>>>> --
>>>>>>> Ricardo Paiva
>>>>>>> Big Data / Semântica
>>>>>>> *globo.com* <http://www.globo.com>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Ricardo Paiva
>>>>> Big Data / Semântica
>>>>> *globo.com* <http://www.globo.com>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Ricardo Paiva
>>>> Big Data / Semântica
>>>> *globo.com* <http://www.globo.com>
>>>>
>>>
>>>
>>
>>
>> --
>> Ricardo Paiva
>> Big Data / Semântica
>> 2483-6432
>> *globo.com* <http://www.globo.com>
>>
>
>


-- 
Ricardo Paiva
Big Data / Semântica
2483-6432
*globo.com* <http://www.globo.com>

Reply via email to