All, I have more of a general Scala JSON question.
I have setup a notification on the S3 source bucket that triggers a Lambda function to unzip the new file placed there. Then, it saves the unzipped CSV file into another destination bucket where a notification is sent to a SQS topic. The contents of the message body is in JSON having the top level be the “Records” collection where within are 1 or more “Record” objects. I would like to know how to iterate through the “Records” retrieving each “Record” to extract the bucket value and the key value. I would then use this information to download the file into a DataFrame via spark-csv. Does anyone have any experience doing this? I wrote some quick stab at it, but I know it’s not right. def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(sc, Seconds(30)) // new context val records = ssc.receiverStream(new SQSReceiver("amg-events") .credentials(accessKey, secretKey) .at(Regions.US_EAST_1) .withTimeout(2)) records.foreach(record => { val bucket = record['s3']['bucket']['name'] val key = record['s3']['object']['key'] val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") // Use first line of all files as header .option("inferSchema", "true") // Automatically infer data types .load("s3://" + bucket + "/" + key) //save to hbase }) ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } Thanks, Ben > On Apr 9, 2016, at 6:12 PM, Benjamin Kim <bbuil...@gmail.com> wrote: > > Ah, I spoke too soon. > > I thought the SQS part was going to be a spark package. It looks like it has > be compiled into a jar for use. Am I right? Can someone help with this? I > tried to compile it using SBT, but I’m stuck with a SonatypeKeys not found > error. > > If there’s an easier alternative, please let me know. > > Thanks, > Ben > > >> On Apr 9, 2016, at 2:49 PM, Benjamin Kim <bbuil...@gmail.com >> <mailto:bbuil...@gmail.com>> wrote: >> >> This was easy! >> >> I just created a notification on a source S3 bucket to kick off a Lambda >> function that would decompress the dropped file and save it to another S3 >> bucket. In return, this S3 bucket has a notification to send a SNS message >> to me via email. I can just as easily setup SQS to be the endpoint of this >> notification. This would then convey to a listening Spark Streaming job the >> file information to download. I like this! >> >> Cheers, >> Ben >> >>> On Apr 9, 2016, at 9:54 AM, Benjamin Kim <bbuil...@gmail.com >>> <mailto:bbuil...@gmail.com>> wrote: >>> >>> This is awesome! I have someplace to start from. >>> >>> Thanks, >>> Ben >>> >>> >>>> On Apr 9, 2016, at 9:45 AM, programminggee...@gmail.com >>>> <mailto:programminggee...@gmail.com> wrote: >>>> >>>> Someone please correct me if I am wrong as I am still rather green to >>>> spark, however it appears that through the S3 notification mechanism >>>> described below, you can publish events to SQS and use SQS as a streaming >>>> source into spark. The project at >>>> https://github.com/imapi/spark-sqs-receiver >>>> <https://github.com/imapi/spark-sqs-receiver> appears to provide libraries >>>> for doing this. >>>> >>>> Hope this helps. >>>> >>>> Sent from my iPhone >>>> >>>> On Apr 9, 2016, at 9:55 AM, Benjamin Kim <bbuil...@gmail.com >>>> <mailto:bbuil...@gmail.com>> wrote: >>>> >>>>> Nezih, >>>>> >>>>> This looks like a good alternative to having the Spark Streaming job >>>>> check for new files on its own. Do you know if there is a way to have the >>>>> Spark Streaming job get notified with the new file information and act >>>>> upon it? This can reduce the overhead and cost of polling S3. Plus, I can >>>>> use this to notify and kick off Lambda to process new data files and make >>>>> them ready for Spark Streaming to consume. This will also use >>>>> notifications to trigger. I just need to have all incoming folders >>>>> configured for notifications for Lambda and all outgoing folders for >>>>> Spark Streaming. This sounds like a better setup than we have now. >>>>> >>>>> Thanks, >>>>> Ben >>>>> >>>>>> On Apr 9, 2016, at 12:25 AM, Nezih Yigitbasi <nyigitb...@netflix.com >>>>>> <mailto:nyigitb...@netflix.com>> wrote: >>>>>> >>>>>> While it is doable in Spark, S3 also supports notifications: >>>>>> http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html >>>>>> <http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html> >>>>>> >>>>>> >>>>>> On Fri, Apr 8, 2016 at 9:15 PM Natu Lauchande <nlaucha...@gmail.com >>>>>> <mailto:nlaucha...@gmail.com>> wrote: >>>>>> Hi Benjamin, >>>>>> >>>>>> I have done it . The critical configuration items are the ones below : >>>>>> >>>>>> ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl", >>>>>> "org.apache.hadoop.fs.s3native.NativeS3FileSystem") >>>>>> ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", >>>>>> AccessKeyId) >>>>>> >>>>>> ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", >>>>>> AWSSecretAccessKey) >>>>>> >>>>>> val inputS3Stream = >>>>>> ssc.textFileStream("s3://example_bucket/folder >>>>>> <s3://example_bucket/folder>") >>>>>> >>>>>> This code will probe for new S3 files created in your every batch >>>>>> interval. >>>>>> >>>>>> Thanks, >>>>>> Natu >>>>>> >>>>>> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim <bbuil...@gmail.com >>>>>> <mailto:bbuil...@gmail.com>> wrote: >>>>>> Has anyone monitored an S3 bucket or directory using Spark Streaming and >>>>>> pulled any new files to process? If so, can you provide basic Scala >>>>>> coding help on this? >>>>>> >>>>>> Thanks, >>>>>> Ben >>>>>> >>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> <mailto:user-unsubscr...@spark.apache.org> >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> <mailto:user-h...@spark.apache.org> >>>>>> >>>>>> >>>>> >>> >> >