All,

I have more of a general Scala JSON question.

I have setup a notification on the S3 source bucket that triggers a Lambda 
function to unzip the new file placed there. Then, it saves the unzipped CSV 
file into another destination bucket where a notification is sent to a SQS 
topic. The contents of the message body is in JSON having the top level be the 
“Records” collection where within are 1 or more “Record” objects. I would like 
to know how to iterate through the “Records” retrieving each “Record” to 
extract the bucket value and the key value. I would then use this information 
to download the file into a DataFrame via spark-csv. Does anyone have any 
experience doing this?

I wrote some quick stab at it, but I know it’s not right.

def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(30))   // new context
    val records = ssc.receiverStream(new SQSReceiver("amg-events")
        .credentials(accessKey, secretKey)
        .at(Regions.US_EAST_1)
        .withTimeout(2))

    records.foreach(record => {
        val bucket = record['s3']['bucket']['name']
        val key = record['s3']['object']['key']
        val df = sqlContext.read
            .format("com.databricks.spark.csv")
            .option("header", "true") // Use first line of all files as header
            .option("inferSchema", "true") // Automatically infer data types
            .load("s3://" + bucket + "/" + key)
        //save to hbase
    })
    
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

Thanks,
Ben

> On Apr 9, 2016, at 6:12 PM, Benjamin Kim <bbuil...@gmail.com> wrote:
> 
> Ah, I spoke too soon.
> 
> I thought the SQS part was going to be a spark package. It looks like it has 
> be compiled into a jar for use. Am I right? Can someone help with this? I 
> tried to compile it using SBT, but I’m stuck with a SonatypeKeys not found 
> error.
> 
> If there’s an easier alternative, please let me know.
> 
> Thanks,
> Ben
> 
> 
>> On Apr 9, 2016, at 2:49 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> 
>> This was easy!
>> 
>> I just created a notification on a source S3 bucket to kick off a Lambda 
>> function that would decompress the dropped file and save it to another S3 
>> bucket. In return, this S3 bucket has a notification to send a SNS message 
>> to me via email. I can just as easily setup SQS to be the endpoint of this 
>> notification. This would then convey to a listening Spark Streaming job the 
>> file information to download. I like this!
>> 
>> Cheers,
>> Ben 
>> 
>>> On Apr 9, 2016, at 9:54 AM, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> 
>>> This is awesome! I have someplace to start from.
>>> 
>>> Thanks,
>>> Ben
>>> 
>>> 
>>>> On Apr 9, 2016, at 9:45 AM, programminggee...@gmail.com 
>>>> <mailto:programminggee...@gmail.com> wrote:
>>>> 
>>>> Someone please correct me if I am wrong as I am still rather green to 
>>>> spark, however it appears that through the S3 notification mechanism 
>>>> described below, you can publish events to SQS and use SQS as a streaming 
>>>> source into spark. The project at 
>>>> https://github.com/imapi/spark-sqs-receiver 
>>>> <https://github.com/imapi/spark-sqs-receiver> appears to provide libraries 
>>>> for doing this.
>>>> 
>>>> Hope this helps.
>>>> 
>>>> Sent from my iPhone
>>>> 
>>>> On Apr 9, 2016, at 9:55 AM, Benjamin Kim <bbuil...@gmail.com 
>>>> <mailto:bbuil...@gmail.com>> wrote:
>>>> 
>>>>> Nezih,
>>>>> 
>>>>> This looks like a good alternative to having the Spark Streaming job 
>>>>> check for new files on its own. Do you know if there is a way to have the 
>>>>> Spark Streaming job get notified with the new file information and act 
>>>>> upon it? This can reduce the overhead and cost of polling S3. Plus, I can 
>>>>> use this to notify and kick off Lambda to process new data files and make 
>>>>> them ready for Spark Streaming to consume. This will also use 
>>>>> notifications to trigger. I just need to have all incoming folders 
>>>>> configured for notifications for Lambda and all outgoing folders for 
>>>>> Spark Streaming. This sounds like a better setup than we have now.
>>>>> 
>>>>> Thanks,
>>>>> Ben
>>>>> 
>>>>>> On Apr 9, 2016, at 12:25 AM, Nezih Yigitbasi <nyigitb...@netflix.com 
>>>>>> <mailto:nyigitb...@netflix.com>> wrote:
>>>>>> 
>>>>>> While it is doable in Spark, S3 also supports notifications: 
>>>>>> http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html 
>>>>>> <http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html>
>>>>>> 
>>>>>> 
>>>>>> On Fri, Apr 8, 2016 at 9:15 PM Natu Lauchande <nlaucha...@gmail.com 
>>>>>> <mailto:nlaucha...@gmail.com>> wrote:
>>>>>> Hi Benjamin,
>>>>>> 
>>>>>> I have done it . The critical configuration items are the ones below :
>>>>>> 
>>>>>>       ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl", 
>>>>>> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>>>>>>       ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", 
>>>>>> AccessKeyId)
>>>>>>       
>>>>>> ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", 
>>>>>> AWSSecretAccessKey)
>>>>>> 
>>>>>>       val inputS3Stream =  
>>>>>> ssc.textFileStream("s3://example_bucket/folder 
>>>>>> <s3://example_bucket/folder>")
>>>>>> 
>>>>>> This code will probe for new S3 files created in your every batch 
>>>>>> interval.
>>>>>> 
>>>>>> Thanks,
>>>>>> Natu
>>>>>> 
>>>>>> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim <bbuil...@gmail.com 
>>>>>> <mailto:bbuil...@gmail.com>> wrote:
>>>>>> Has anyone monitored an S3 bucket or directory using Spark Streaming and 
>>>>>> pulled any new files to process? If so, can you provide basic Scala 
>>>>>> coding help on this?
>>>>>> 
>>>>>> Thanks,
>>>>>> Ben
>>>>>> 
>>>>>> 
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>>>>> <mailto:user-h...@spark.apache.org>
>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> 
> 

Reply via email to