Re: PySpark Streaming S3 checkpointing

2017-08-03 Thread Riccardo Ferrari
Hi Steve,

Thank you for your answer, much appreciated.

Reading the code seems that:

   - Python StreamingContext.getOrCreate
   
calls
   Scala StreamingContextPythonHelper().tryRecoverFromCheckpoint(
   checkpointPath)
   

   - tryRecoverFromCheckpoint calls CheckpointReader.read(..., new
   SparkConf(), SparkHadoopUtil.get.conf,...)
   - SparkHadoopUtil.get.conf
   

   (when not using yarn) do:
  - sparkConf = new SparkConf(false).loadFromSystemProperties(true)
  - Configuration = newConfiguration(sparkConf)

I have to admit I have not tested (read: debug) it and might not be
completely accurate  (checkpointing is not the highest priority), however I
have the feeling I can not provide those properties via code because a new
configuration gets instantiated/read from system properties and whatever I
set to the current running context is ignored (or at least this happens in
python).

What do you (or any in the list) think?

Thanks,



On Wed, Aug 2, 2017 at 6:04 PM, Steve Loughran 
wrote:

>
> On 2 Aug 2017, at 10:34, Riccardo Ferrari  wrote:
>
> Hi list!
>
> I am working on a pyspark streaming job (ver 2.2.0) and I need to enable
> checkpointing. At high level my python script goes like this:
>
> class StreamingJob():
>
> def __init__(..):
> ...
>sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key',)
>sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key',)
>
> def doJob(self):
>ssc = StreamingContext.getOrCreate('',  create ssc>)
>
> and I run it:
>
> myJob = StreamingJob(...)
> myJob.doJob()
>
> The problem is that StreamingContext.getOrCreate is not able to have
> access to hadoop configuration configured in the constructor and fails to
> load from checkpoint with
>
> "com.amazonaws.AmazonClientException: Unable to load AWS credentials from
> any provider in the chain"
>
> If I export AWS credentials to the system ENV before starting the script
> it works!
>
>
> Spark magically copies the env vars over for you when you launch a job
>
> I see the Scala version has an option to provide the hadoop configuration
> that is not available in python
>
> I don't have the whole Hadoop, just Spark, so I don't really want to
> configure hadoop's xmls and such
>
>
> when you set up the context, as in spark-defaults.conf
>
> spark.hadoop.fs.s3a.access.key=access key
> spark.hadoop.fs.s3a.secret.key=secret key
>
> Reminder: Do keep your secret key a secret, avoid checking it in to any
> form of revision control.
>


Re: PySpark Streaming S3 checkpointing

2017-08-02 Thread Steve Loughran

On 2 Aug 2017, at 10:34, Riccardo Ferrari 
> wrote:

Hi list!

I am working on a pyspark streaming job (ver 2.2.0) and I need to enable 
checkpointing. At high level my python script goes like this:

class StreamingJob():

def __init__(..):
...
   sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key',)
   sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key',)

def doJob(self):
   ssc = StreamingContext.getOrCreate('', )

and I run it:

myJob = StreamingJob(...)
myJob.doJob()

The problem is that StreamingContext.getOrCreate is not able to have access to 
hadoop configuration configured in the constructor and fails to load from 
checkpoint with

"com.amazonaws.AmazonClientException: Unable to load AWS credentials from any 
provider in the chain"

If I export AWS credentials to the system ENV before starting the script it 
works!


Spark magically copies the env vars over for you when you launch a job


I see the Scala version has an option to provide the hadoop configuration that 
is not available in python

I don't have the whole Hadoop, just Spark, so I don't really want to configure 
hadoop's xmls and such


when you set up the context, as in spark-defaults.conf

spark.hadoop.fs.s3a.access.key=access key
spark.hadoop.fs.s3a.secret.key=secret key

Reminder: Do keep your secret key a secret, avoid checking it in to any form of 
revision control.


PySpark Streaming S3 checkpointing

2017-08-02 Thread Riccardo Ferrari
Hi list!

I am working on a pyspark streaming job (ver 2.2.0) and I need to enable
checkpointing. At high level my python script goes like this:

class StreamingJob():

def __init__(..):
...
   sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key',)
   sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key',)

def doJob(self):
   ssc = StreamingContext.getOrCreate('', )

and I run it:

myJob = StreamingJob(...)
myJob.doJob()

The problem is that StreamingContext.getOrCreate is not able to have access
to hadoop configuration configured in the constructor and fails to load
from checkpoint with

"com.amazonaws.AmazonClientException: Unable to load AWS credentials from
any provider in the chain"

If I export AWS credentials to the system ENV before starting the script it
works!

I see the Scala version has an option to provide the hadoop configuration
that is not available in python

I don't have the whole Hadoop, just Spark, so I don't really want to
configure hadoop's xmls and such

What is the cleanest way to achieve my goal?

 thanks!