Re: Is it required to remove checkpoint when submitting a code change?

2015-09-14 Thread Ricardo Paiva
Thanks Cody.

You confirmed that I'm not doing something wrong. I will keep investigating
and if I find something I let everybody know.

Thanks again.

Regards,

Ricardo

On Mon, Sep 14, 2015 at 6:29 PM, Cody Koeninger  wrote:

> Yeah, looks like you're right about being unable to change those.  Upon
> further reading, even though StreamingContext.getOrCreate makes an entirely
> new spark conf, Checkpoint will only reload certain properties.
>
> I'm not sure if it'd be safe to include memory / cores among those
> properties that get re-loaded, TD would be a better person to ask.
>
> On Mon, Sep 14, 2015 at 2:54 PM, Ricardo Paiva <
> ricardo.pa...@corp.globo.com> wrote:
>
>> Hi Cody,
>>
>> Thanks for your answer.
>>
>> I had already tried to change the spark submit parameters, but I double
>> checked to reply your answer. Even changing properties file or directly on
>> the spark-submit arguments, none of them work when the application runs
>> from the checkpoint. It seems that everything is cached. I changed driver
>> memory, executor memory, executor cores and number of executors.
>>
>> So, the scenario I have today is: once the Spark Streaming application
>> retrieves the data from the checkpoint, I can't change the submission
>> parameters neither the code parameters without remove the checkpoint
>> folder, loosing all the data used by windowed functions. I was wondering
>> what kind of parameters are you guys loading from the configuration file,
>> when using checkpoints.
>>
>> I really appreciate all the help on this.
>>
>> Many thanks,
>>
>> Ricardo
>>
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Sep 11, 2015 at 11:09 AM, Cody Koeninger 
>> wrote:
>>
>>> Yeah, it makes sense that parameters that are read only during your
>>> getOrCCreate function wouldn't be re-read, since that function isn't called
>>> if a checkpoint is loaded.
>>>
>>> I would have thought changing number of executors and other things used
>>> by spark-submit would work on checkpoint restart.  Have you tried both
>>> changing them in the properties file provided to spark submit, and the
>>> --arguments that correspond to number of cores / executor memory?
>>>
>>> On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva <
>>> ricardo.pa...@corp.globo.com> wrote:
>>>

 Hi guys,

 I tried to use the configuration file, but it didn't work as I
 expected. As part of the Spark Streaming flow, my methods run only when the
 application is started the first time. Once I restart the app, it reads
 from the checkpoint and all the dstream operations come from the cache. No
 parameter is reloaded.

 I would like to know if it's possible to reset the time of windowed
 operations, checkpoint time etc. I also would like to change the submission
 parameters, like number of executors, memory per executor or driver etc. If
 it's not possible, what kind of parameters do you guys usually use in a
 configuration file. I know that the streaming interval it not possible to
 be changed.

 This is my code:

 def main(args: Array[String]): Unit = {
   val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER,
 createSparkContext _)
   ssc.start()
   ssc.awaitTermination()
   ssc.stop()
 }

 def createSparkContext(): StreamingContext = {
   val sparkConf = new SparkConf()
  .setAppName(APP_NAME)
  .set("spark.streaming.unpersist", "true")
   val ssc = new StreamingContext(sparkConf, streamingInterval)
   ssc.checkpoint(CHECKPOINT_FOLDER)
   ssc.sparkContext.addFile(CONFIG_FILENAME)

   val rawStream = createKafkaRDD(ssc)
   processAndSave(rawStream)
   return ssc
 }

 def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = {

   val configFile = SparkFiles.get("config.properties")
   val config:Config = ConfigFactory.parseFile(new File(configFile))


 *  slidingInterval =
 Minutes(config.getInt("streaming.sliding.interval"))  windowLength =
 Minutes(config.getInt("streaming.window.interval"))  minPageview =
 config.getInt("streaming.pageview.min")*


   val pageviewStream = rawStream.map{ case (_, raw) =>
 (PageViewParser.parseURL(raw), 1L) }
   val pageviewsHourlyCount = 
 stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum
 _,

 PageViewAgregator.pageviewsMinus _,
  *windowLength*,


 *slidingInterval*)

   val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >=
 *minPageview*)
   permalinkAudienceStream.map(a => s"${a._1}\t${a._2}")
  .repartition(1)
  .saveAsTextFiles(DESTINATION_FILE, "txt")

 }

 I really appreciate any help on this.

 Many thanks,

 Ricardo

 On Thu, Sep 

Re: Is it required to remove checkpoint when submitting a code change?

2015-09-14 Thread Cody Koeninger
Yeah, looks like you're right about being unable to change those.  Upon
further reading, even though StreamingContext.getOrCreate makes an entirely
new spark conf, Checkpoint will only reload certain properties.

I'm not sure if it'd be safe to include memory / cores among those
properties that get re-loaded, TD would be a better person to ask.

On Mon, Sep 14, 2015 at 2:54 PM, Ricardo Paiva  wrote:

> Hi Cody,
>
> Thanks for your answer.
>
> I had already tried to change the spark submit parameters, but I double
> checked to reply your answer. Even changing properties file or directly on
> the spark-submit arguments, none of them work when the application runs
> from the checkpoint. It seems that everything is cached. I changed driver
> memory, executor memory, executor cores and number of executors.
>
> So, the scenario I have today is: once the Spark Streaming application
> retrieves the data from the checkpoint, I can't change the submission
> parameters neither the code parameters without remove the checkpoint
> folder, loosing all the data used by windowed functions. I was wondering
> what kind of parameters are you guys loading from the configuration file,
> when using checkpoints.
>
> I really appreciate all the help on this.
>
> Many thanks,
>
> Ricardo
>
>
>
>
>
>
>
>
> On Fri, Sep 11, 2015 at 11:09 AM, Cody Koeninger 
> wrote:
>
>> Yeah, it makes sense that parameters that are read only during your
>> getOrCCreate function wouldn't be re-read, since that function isn't called
>> if a checkpoint is loaded.
>>
>> I would have thought changing number of executors and other things used
>> by spark-submit would work on checkpoint restart.  Have you tried both
>> changing them in the properties file provided to spark submit, and the
>> --arguments that correspond to number of cores / executor memory?
>>
>> On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva <
>> ricardo.pa...@corp.globo.com> wrote:
>>
>>>
>>> Hi guys,
>>>
>>> I tried to use the configuration file, but it didn't work as I expected.
>>> As part of the Spark Streaming flow, my methods run only when the
>>> application is started the first time. Once I restart the app, it reads
>>> from the checkpoint and all the dstream operations come from the cache. No
>>> parameter is reloaded.
>>>
>>> I would like to know if it's possible to reset the time of windowed
>>> operations, checkpoint time etc. I also would like to change the submission
>>> parameters, like number of executors, memory per executor or driver etc. If
>>> it's not possible, what kind of parameters do you guys usually use in a
>>> configuration file. I know that the streaming interval it not possible to
>>> be changed.
>>>
>>> This is my code:
>>>
>>> def main(args: Array[String]): Unit = {
>>>   val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER,
>>> createSparkContext _)
>>>   ssc.start()
>>>   ssc.awaitTermination()
>>>   ssc.stop()
>>> }
>>>
>>> def createSparkContext(): StreamingContext = {
>>>   val sparkConf = new SparkConf()
>>>  .setAppName(APP_NAME)
>>>  .set("spark.streaming.unpersist", "true")
>>>   val ssc = new StreamingContext(sparkConf, streamingInterval)
>>>   ssc.checkpoint(CHECKPOINT_FOLDER)
>>>   ssc.sparkContext.addFile(CONFIG_FILENAME)
>>>
>>>   val rawStream = createKafkaRDD(ssc)
>>>   processAndSave(rawStream)
>>>   return ssc
>>> }
>>>
>>> def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = {
>>>
>>>   val configFile = SparkFiles.get("config.properties")
>>>   val config:Config = ConfigFactory.parseFile(new File(configFile))
>>>
>>>
>>> *  slidingInterval =
>>> Minutes(config.getInt("streaming.sliding.interval"))  windowLength =
>>> Minutes(config.getInt("streaming.window.interval"))  minPageview =
>>> config.getInt("streaming.pageview.min")*
>>>
>>>
>>>   val pageviewStream = rawStream.map{ case (_, raw) =>
>>> (PageViewParser.parseURL(raw), 1L) }
>>>   val pageviewsHourlyCount = 
>>> stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum
>>> _,
>>>
>>> PageViewAgregator.pageviewsMinus _,
>>>  *windowLength*,
>>>
>>>
>>> *slidingInterval*)
>>>
>>>   val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >=
>>> *minPageview*)
>>>   permalinkAudienceStream.map(a => s"${a._1}\t${a._2}")
>>>  .repartition(1)
>>>  .saveAsTextFiles(DESTINATION_FILE, "txt")
>>>
>>> }
>>>
>>> I really appreciate any help on this.
>>>
>>> Many thanks,
>>>
>>> Ricardo
>>>
>>> On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva <
>>> ricardo.pa...@corp.globo.com> wrote:
>>>
 Good tip. I will try that.

 Thank you.

 On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger 
 wrote:

> Yeah, in general if you're changing the jar you can't recover the
> checkpoint.
>
> If you're just changing parameters, why not 

Re: Is it required to remove checkpoint when submitting a code change?

2015-09-14 Thread Ricardo Paiva
Hi Cody,

Thanks for your answer.

I had already tried to change the spark submit parameters, but I double
checked to reply your answer. Even changing properties file or directly on
the spark-submit arguments, none of them work when the application runs
from the checkpoint. It seems that everything is cached. I changed driver
memory, executor memory, executor cores and number of executors.

So, the scenario I have today is: once the Spark Streaming application
retrieves the data from the checkpoint, I can't change the submission
parameters neither the code parameters without remove the checkpoint
folder, loosing all the data used by windowed functions. I was wondering
what kind of parameters are you guys loading from the configuration file,
when using checkpoints.

I really appreciate all the help on this.

Many thanks,

Ricardo








On Fri, Sep 11, 2015 at 11:09 AM, Cody Koeninger  wrote:

> Yeah, it makes sense that parameters that are read only during your
> getOrCCreate function wouldn't be re-read, since that function isn't called
> if a checkpoint is loaded.
>
> I would have thought changing number of executors and other things used by
> spark-submit would work on checkpoint restart.  Have you tried both
> changing them in the properties file provided to spark submit, and the
> --arguments that correspond to number of cores / executor memory?
>
> On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva <
> ricardo.pa...@corp.globo.com> wrote:
>
>>
>> Hi guys,
>>
>> I tried to use the configuration file, but it didn't work as I expected.
>> As part of the Spark Streaming flow, my methods run only when the
>> application is started the first time. Once I restart the app, it reads
>> from the checkpoint and all the dstream operations come from the cache. No
>> parameter is reloaded.
>>
>> I would like to know if it's possible to reset the time of windowed
>> operations, checkpoint time etc. I also would like to change the submission
>> parameters, like number of executors, memory per executor or driver etc. If
>> it's not possible, what kind of parameters do you guys usually use in a
>> configuration file. I know that the streaming interval it not possible to
>> be changed.
>>
>> This is my code:
>>
>> def main(args: Array[String]): Unit = {
>>   val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER,
>> createSparkContext _)
>>   ssc.start()
>>   ssc.awaitTermination()
>>   ssc.stop()
>> }
>>
>> def createSparkContext(): StreamingContext = {
>>   val sparkConf = new SparkConf()
>>  .setAppName(APP_NAME)
>>  .set("spark.streaming.unpersist", "true")
>>   val ssc = new StreamingContext(sparkConf, streamingInterval)
>>   ssc.checkpoint(CHECKPOINT_FOLDER)
>>   ssc.sparkContext.addFile(CONFIG_FILENAME)
>>
>>   val rawStream = createKafkaRDD(ssc)
>>   processAndSave(rawStream)
>>   return ssc
>> }
>>
>> def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = {
>>
>>   val configFile = SparkFiles.get("config.properties")
>>   val config:Config = ConfigFactory.parseFile(new File(configFile))
>>
>>
>> *  slidingInterval =
>> Minutes(config.getInt("streaming.sliding.interval"))  windowLength =
>> Minutes(config.getInt("streaming.window.interval"))  minPageview =
>> config.getInt("streaming.pageview.min")*
>>
>>
>>   val pageviewStream = rawStream.map{ case (_, raw) =>
>> (PageViewParser.parseURL(raw), 1L) }
>>   val pageviewsHourlyCount = 
>> stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum
>> _,
>>
>> PageViewAgregator.pageviewsMinus _,
>>  *windowLength*,
>>
>> *slidingInterval*)
>>
>>   val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >=
>> *minPageview*)
>>   permalinkAudienceStream.map(a => s"${a._1}\t${a._2}")
>>  .repartition(1)
>>  .saveAsTextFiles(DESTINATION_FILE, "txt")
>>
>> }
>>
>> I really appreciate any help on this.
>>
>> Many thanks,
>>
>> Ricardo
>>
>> On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva <
>> ricardo.pa...@corp.globo.com> wrote:
>>
>>> Good tip. I will try that.
>>>
>>> Thank you.
>>>
>>> On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger 
>>> wrote:
>>>
 Yeah, in general if you're changing the jar you can't recover the
 checkpoint.

 If you're just changing parameters, why not externalize those in a
 configuration file so your jar doesn't change?  I tend to stick even my
 app-specific parameters in an external spark config so everything is in one
 place.

 On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva <
 ricardo.pa...@corp.globo.com> wrote:

> Hi,
>
> Is there a way to submit an app code change, keeping the checkpoint
> data or do I need to erase the checkpoint folder every time I re-submit 
> the
> spark app with a new jar?
>
> I have an app that count pageviews streaming from Kafka, and 

Re: Is it required to remove checkpoint when submitting a code change?

2015-09-11 Thread Cody Koeninger
Yeah, it makes sense that parameters that are read only during your
getOrCCreate function wouldn't be re-read, since that function isn't called
if a checkpoint is loaded.

I would have thought changing number of executors and other things used by
spark-submit would work on checkpoint restart.  Have you tried both
changing them in the properties file provided to spark submit, and the
--arguments that correspond to number of cores / executor memory?

On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva <
ricardo.pa...@corp.globo.com> wrote:

>
> Hi guys,
>
> I tried to use the configuration file, but it didn't work as I expected.
> As part of the Spark Streaming flow, my methods run only when the
> application is started the first time. Once I restart the app, it reads
> from the checkpoint and all the dstream operations come from the cache. No
> parameter is reloaded.
>
> I would like to know if it's possible to reset the time of windowed
> operations, checkpoint time etc. I also would like to change the submission
> parameters, like number of executors, memory per executor or driver etc. If
> it's not possible, what kind of parameters do you guys usually use in a
> configuration file. I know that the streaming interval it not possible to
> be changed.
>
> This is my code:
>
> def main(args: Array[String]): Unit = {
>   val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER,
> createSparkContext _)
>   ssc.start()
>   ssc.awaitTermination()
>   ssc.stop()
> }
>
> def createSparkContext(): StreamingContext = {
>   val sparkConf = new SparkConf()
>  .setAppName(APP_NAME)
>  .set("spark.streaming.unpersist", "true")
>   val ssc = new StreamingContext(sparkConf, streamingInterval)
>   ssc.checkpoint(CHECKPOINT_FOLDER)
>   ssc.sparkContext.addFile(CONFIG_FILENAME)
>
>   val rawStream = createKafkaRDD(ssc)
>   processAndSave(rawStream)
>   return ssc
> }
>
> def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = {
>
>   val configFile = SparkFiles.get("config.properties")
>   val config:Config = ConfigFactory.parseFile(new File(configFile))
>
>
> *  slidingInterval = Minutes(config.getInt("streaming.sliding.interval"))
> windowLength = Minutes(config.getInt("streaming.window.interval"))
> minPageview = config.getInt("streaming.pageview.min")*
>
>
>   val pageviewStream = rawStream.map{ case (_, raw) =>
> (PageViewParser.parseURL(raw), 1L) }
>   val pageviewsHourlyCount = 
> stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum
> _,
>
> PageViewAgregator.pageviewsMinus _,
>  *windowLength*,
>  *slidingInterval*
> )
>
>   val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >=
> *minPageview*)
>   permalinkAudienceStream.map(a => s"${a._1}\t${a._2}")
>  .repartition(1)
>  .saveAsTextFiles(DESTINATION_FILE, "txt")
>
> }
>
> I really appreciate any help on this.
>
> Many thanks,
>
> Ricardo
>
> On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva <
> ricardo.pa...@corp.globo.com> wrote:
>
>> Good tip. I will try that.
>>
>> Thank you.
>>
>> On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger 
>> wrote:
>>
>>> Yeah, in general if you're changing the jar you can't recover the
>>> checkpoint.
>>>
>>> If you're just changing parameters, why not externalize those in a
>>> configuration file so your jar doesn't change?  I tend to stick even my
>>> app-specific parameters in an external spark config so everything is in one
>>> place.
>>>
>>> On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva <
>>> ricardo.pa...@corp.globo.com> wrote:
>>>
 Hi,

 Is there a way to submit an app code change, keeping the checkpoint
 data or do I need to erase the checkpoint folder every time I re-submit the
 spark app with a new jar?

 I have an app that count pageviews streaming from Kafka, and deliver a
 file every hour from the past 24 hours. I'm using reduceByKeyAndWindow with
 the reduce and inverse functions set.

 I'm doing some code improvements and would like to keep the data from
 the past hours, so when I re-submit a code change, I would keep delivering
 the pageviews aggregation without need to wait for 24 hours of new data.
 Sometimes I'm just changing the submission parameters, like number of
 executors, memory and cores.

 Many thanks,

 Ricardo

 --
 Ricardo Paiva
 Big Data / Semântica
 *globo.com* 

>>>
>>>
>>
>>
>> --
>> Ricardo Paiva
>> Big Data / Semântica
>> *globo.com* 
>>
>
>
>
> --
> Ricardo Paiva
> Big Data / Semântica
> *globo.com* 
>


Re: Is it required to remove checkpoint when submitting a code change?

2015-09-10 Thread Ricardo Luis Silva Paiva
Hi guys,

I tried to use the configuration file, but it didn't work as I expected. As
part of the Spark Streaming flow, my methods run only when the application
is started the first time. Once I restart the app, it reads from the
checkpoint and all the dstream operations come from the cache. No parameter
is reloaded.

I would like to know if it's possible to reset the time of windowed
operations, checkpoint time etc. I also would like to change the submission
parameters, like number of executors, memory per executor or driver etc. If
it's not possible, what kind of parameters do you guys usually use in a
configuration file. I know that the streaming interval it not possible to
be changed.

This is my code:

def main(args: Array[String]): Unit = {
  val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER,
createSparkContext _)
  ssc.start()
  ssc.awaitTermination()
  ssc.stop()
}

def createSparkContext(): StreamingContext = {
  val sparkConf = new SparkConf()
 .setAppName(APP_NAME)
 .set("spark.streaming.unpersist", "true")
  val ssc = new StreamingContext(sparkConf, streamingInterval)
  ssc.checkpoint(CHECKPOINT_FOLDER)
  ssc.sparkContext.addFile(CONFIG_FILENAME)

  val rawStream = createKafkaRDD(ssc)
  processAndSave(rawStream)
  return ssc
}

def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = {

  val configFile = SparkFiles.get("config.properties")
  val config:Config = ConfigFactory.parseFile(new File(configFile))


*  slidingInterval = Minutes(config.getInt("streaming.sliding.interval"))
windowLength = Minutes(config.getInt("streaming.window.interval"))
minPageview = config.getInt("streaming.pageview.min")*


  val pageviewStream = rawStream.map{ case (_, raw) =>
(PageViewParser.parseURL(raw), 1L) }
  val pageviewsHourlyCount =
stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum
_,

PageViewAgregator.pageviewsMinus
_,
 *windowLength*,
 *slidingInterval*)

  val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >=
*minPageview*)
  permalinkAudienceStream.map(a => s"${a._1}\t${a._2}")
 .repartition(1)
 .saveAsTextFiles(DESTINATION_FILE, "txt")

}

I really appreciate any help on this.

Many thanks,

Ricardo

On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva <
ricardo.pa...@corp.globo.com> wrote:

> Good tip. I will try that.
>
> Thank you.
>
> On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger  wrote:
>
>> Yeah, in general if you're changing the jar you can't recover the
>> checkpoint.
>>
>> If you're just changing parameters, why not externalize those in a
>> configuration file so your jar doesn't change?  I tend to stick even my
>> app-specific parameters in an external spark config so everything is in one
>> place.
>>
>> On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva <
>> ricardo.pa...@corp.globo.com> wrote:
>>
>>> Hi,
>>>
>>> Is there a way to submit an app code change, keeping the checkpoint data
>>> or do I need to erase the checkpoint folder every time I re-submit the
>>> spark app with a new jar?
>>>
>>> I have an app that count pageviews streaming from Kafka, and deliver a
>>> file every hour from the past 24 hours. I'm using reduceByKeyAndWindow with
>>> the reduce and inverse functions set.
>>>
>>> I'm doing some code improvements and would like to keep the data from
>>> the past hours, so when I re-submit a code change, I would keep delivering
>>> the pageviews aggregation without need to wait for 24 hours of new data.
>>> Sometimes I'm just changing the submission parameters, like number of
>>> executors, memory and cores.
>>>
>>> Many thanks,
>>>
>>> Ricardo
>>>
>>> --
>>> Ricardo Paiva
>>> Big Data / Semântica
>>> *globo.com* 
>>>
>>
>>
>
>
> --
> Ricardo Paiva
> Big Data / Semântica
> *globo.com* 
>



-- 
Ricardo Paiva
Big Data / Semântica
*globo.com* 


Re: Is it required to remove checkpoint when submitting a code change?

2015-09-03 Thread Ricardo Luis Silva Paiva
Good tip. I will try that.

Thank you.

On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger  wrote:

> Yeah, in general if you're changing the jar you can't recover the
> checkpoint.
>
> If you're just changing parameters, why not externalize those in a
> configuration file so your jar doesn't change?  I tend to stick even my
> app-specific parameters in an external spark config so everything is in one
> place.
>
> On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva <
> ricardo.pa...@corp.globo.com> wrote:
>
>> Hi,
>>
>> Is there a way to submit an app code change, keeping the checkpoint data
>> or do I need to erase the checkpoint folder every time I re-submit the
>> spark app with a new jar?
>>
>> I have an app that count pageviews streaming from Kafka, and deliver a
>> file every hour from the past 24 hours. I'm using reduceByKeyAndWindow with
>> the reduce and inverse functions set.
>>
>> I'm doing some code improvements and would like to keep the data from the
>> past hours, so when I re-submit a code change, I would keep delivering the
>> pageviews aggregation without need to wait for 24 hours of new data.
>> Sometimes I'm just changing the submission parameters, like number of
>> executors, memory and cores.
>>
>> Many thanks,
>>
>> Ricardo
>>
>> --
>> Ricardo Paiva
>> Big Data / Semântica
>> *globo.com* 
>>
>
>


-- 
Ricardo Paiva
Big Data / Semântica
*globo.com* 


Is it required to remove checkpoint when submitting a code change?

2015-09-02 Thread Ricardo Luis Silva Paiva
Hi,

Is there a way to submit an app code change, keeping the checkpoint data or
do I need to erase the checkpoint folder every time I re-submit the spark
app with a new jar?

I have an app that count pageviews streaming from Kafka, and deliver a file
every hour from the past 24 hours. I'm using reduceByKeyAndWindow with the
reduce and inverse functions set.

I'm doing some code improvements and would like to keep the data from the
past hours, so when I re-submit a code change, I would keep delivering the
pageviews aggregation without need to wait for 24 hours of new data.
Sometimes I'm just changing the submission parameters, like number of
executors, memory and cores.

Many thanks,

Ricardo

-- 
Ricardo Paiva
Big Data / Semântica
*globo.com* 


Re: Is it required to remove checkpoint when submitting a code change?

2015-09-02 Thread Cody Koeninger
Yeah, in general if you're changing the jar you can't recover the
checkpoint.

If you're just changing parameters, why not externalize those in a
configuration file so your jar doesn't change?  I tend to stick even my
app-specific parameters in an external spark config so everything is in one
place.

On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva <
ricardo.pa...@corp.globo.com> wrote:

> Hi,
>
> Is there a way to submit an app code change, keeping the checkpoint data
> or do I need to erase the checkpoint folder every time I re-submit the
> spark app with a new jar?
>
> I have an app that count pageviews streaming from Kafka, and deliver a
> file every hour from the past 24 hours. I'm using reduceByKeyAndWindow with
> the reduce and inverse functions set.
>
> I'm doing some code improvements and would like to keep the data from the
> past hours, so when I re-submit a code change, I would keep delivering the
> pageviews aggregation without need to wait for 24 hours of new data.
> Sometimes I'm just changing the submission parameters, like number of
> executors, memory and cores.
>
> Many thanks,
>
> Ricardo
>
> --
> Ricardo Paiva
> Big Data / Semântica
> *globo.com* 
>