Hi Cody,

Thanks for your answer.

I had already tried to change the spark submit parameters, but I double
checked to reply your answer. Even changing properties file or directly on
the spark-submit arguments, none of them work when the application runs
from the checkpoint. It seems that everything is cached. I changed driver
memory, executor memory, executor cores and number of executors.

So, the scenario I have today is: once the Spark Streaming application
retrieves the data from the checkpoint, I can't change the submission
parameters neither the code parameters without remove the checkpoint
folder, loosing all the data used by windowed functions. I was wondering
what kind of parameters are you guys loading from the configuration file,
when using checkpoints.

I really appreciate all the help on this.

Many thanks,

Ricardo








On Fri, Sep 11, 2015 at 11:09 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Yeah, it makes sense that parameters that are read only during your
> getOrCCreate function wouldn't be re-read, since that function isn't called
> if a checkpoint is loaded.
>
> I would have thought changing number of executors and other things used by
> spark-submit would work on checkpoint restart.  Have you tried both
> changing them in the properties file provided to spark submit, and the
> --arguments that correspond to number of cores / executor memory?
>
> On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva <
> ricardo.pa...@corp.globo.com> wrote:
>
>>
>> Hi guys,
>>
>> I tried to use the configuration file, but it didn't work as I expected.
>> As part of the Spark Streaming flow, my methods run only when the
>> application is started the first time. Once I restart the app, it reads
>> from the checkpoint and all the dstream operations come from the cache. No
>> parameter is reloaded.
>>
>> I would like to know if it's possible to reset the time of windowed
>> operations, checkpoint time etc. I also would like to change the submission
>> parameters, like number of executors, memory per executor or driver etc. If
>> it's not possible, what kind of parameters do you guys usually use in a
>> configuration file. I know that the streaming interval it not possible to
>> be changed.
>>
>> This is my code:
>>
>> def main(args: Array[String]): Unit = {
>>   val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER,
>> createSparkContext _)
>>   ssc.start()
>>   ssc.awaitTermination()
>>   ssc.stop()
>> }
>>
>> def createSparkContext(): StreamingContext = {
>>   val sparkConf = new SparkConf()
>>      .setAppName(APP_NAME)
>>      .set("spark.streaming.unpersist", "true")
>>   val ssc = new StreamingContext(sparkConf, streamingInterval)
>>   ssc.checkpoint(CHECKPOINT_FOLDER)
>>   ssc.sparkContext.addFile(CONFIG_FILENAME)
>>
>>   val rawStream = createKafkaRDD(ssc)
>>   processAndSave(rawStream)
>>   return ssc
>> }
>>
>> def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = {
>>
>>   val configFile = SparkFiles.get("config.properties")
>>   val config:Config = ConfigFactory.parseFile(new File(configFile))
>>
>>
>> *  slidingInterval =
>> Minutes(config.getInt("streaming.sliding.interval"))  windowLength =
>> Minutes(config.getInt("streaming.window.interval"))  minPageview =
>> config.getInt("streaming.pageview.min")*
>>
>>
>>   val pageviewStream = rawStream.map{ case (_, raw) =>
>> (PageViewParser.parseURL(raw), 1L) }
>>   val pageviewsHourlyCount = 
>> stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum
>> _,
>>
>> PageViewAgregator.pageviewsMinus _,
>>                                                          *windowLength*,
>>
>> *slidingInterval*)
>>
>>   val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >=
>> *minPageview*)
>>   permalinkAudienceStream.map(a => s"${a._1}\t${a._2}")
>>                              .repartition(1)
>>                              .saveAsTextFiles(DESTINATION_FILE, "txt")
>>
>> }
>>
>> I really appreciate any help on this.
>>
>> Many thanks,
>>
>> Ricardo
>>
>> On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva <
>> ricardo.pa...@corp.globo.com> wrote:
>>
>>> Good tip. I will try that.
>>>
>>> Thank you.
>>>
>>> On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> Yeah, in general if you're changing the jar you can't recover the
>>>> checkpoint.
>>>>
>>>> If you're just changing parameters, why not externalize those in a
>>>> configuration file so your jar doesn't change?  I tend to stick even my
>>>> app-specific parameters in an external spark config so everything is in one
>>>> place.
>>>>
>>>> On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva <
>>>> ricardo.pa...@corp.globo.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Is there a way to submit an app code change, keeping the checkpoint
>>>>> data or do I need to erase the checkpoint folder every time I re-submit 
>>>>> the
>>>>> spark app with a new jar?
>>>>>
>>>>> I have an app that count pageviews streaming from Kafka, and deliver a
>>>>> file every hour from the past 24 hours. I'm using reduceByKeyAndWindow 
>>>>> with
>>>>> the reduce and inverse functions set.
>>>>>
>>>>> I'm doing some code improvements and would like to keep the data from
>>>>> the past hours, so when I re-submit a code change, I would keep delivering
>>>>> the pageviews aggregation without need to wait for 24 hours of new data.
>>>>> Sometimes I'm just changing the submission parameters, like number of
>>>>> executors, memory and cores.
>>>>>
>>>>> Many thanks,
>>>>>
>>>>> Ricardo
>>>>>
>>>>> --
>>>>> Ricardo Paiva
>>>>> Big Data / Semântica
>>>>> *globo.com* <http://www.globo.com>
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Ricardo Paiva
>>> Big Data / Semântica
>>> *globo.com* <http://www.globo.com>
>>>
>>
>>
>>
>> --
>> Ricardo Paiva
>> Big Data / Semântica
>> *globo.com* <http://www.globo.com>
>>
>
>


-- 
Ricardo Paiva
Big Data / Semântica
2483-6432
*globo.com* <http://www.globo.com>

Reply via email to