Hyukjin,

This is what I did so far. I didn’t use DataSet yet or maybe I don’t need to.

var df: DataFrame = null
for(message <- messages) {
    val bodyRdd = sc.parallelize(message.getBody() :: Nil)
    val fileDf = sqlContext.read.json(bodyRdd)
        .select(
            $"Records.s3.bucket.name".as("bucket"),
            $"Records.s3.object.key".as("key")
        )
    if (df != null) {
      df = df.unionAll(fileDf)
    } else {
      df = fileDf
    }
}
df.show

Each result is returned as an array. I just need to concatenate them together 
to make the S3 URL, and download the files per URL. This I need help with next.

Thanks,
Ben

> On Apr 17, 2016, at 7:38 AM, Hyukjin Kwon <gurwls...@gmail.com> wrote:
> 
> Hi!
> 
> Personally, I don't think it necessarily needs to be DataSet for your goal.
> 
> Just select your data at "s3" from DataFrame loaded by sqlContext.read.json().
> 
> You can try to printSchema() to check the nested schema and then select the 
> data.
> 
> Also, I guess (from your codes) you are trying to send a reauest, fetch the 
> response to driver-side, and then send each message to executor-side. I guess 
> there would be really heavy overhead in driver-side.
> Holden,
> 
> If I were to use DataSets, then I would essentially do this:
> 
> val receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl)
> val messages = sqs.receiveMessage(receiveMessageRequest).getMessages()
> for (message <- messages.asScala) {
>     val files = sqlContext.read.json(message.getBody())
> }
> 
> Can I simply do files.toDS() or do I have to create a schema using a case 
> class File and apply it as[File]? If I have to apply a schema, then how would 
> I create it based on the JSON structure below, especially the nested elements.
> 
> Thanks,
> Ben
> 
> 
>> On Apr 14, 2016, at 3:46 PM, Holden Karau <hol...@pigscanfly.ca 
>> <mailto:hol...@pigscanfly.ca>> wrote:
>> 
>> You could certainly use RDDs for that, you might also find using Dataset 
>> selecting the fields you need to construct the URL to fetch and then using 
>> the map function to be easier.
>> 
>> On Thu, Apr 14, 2016 at 12:01 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> I was wonder what would be the best way to use JSON in Spark/Scala. I need 
>> to lookup values of fields in a collection of records to form a URL and 
>> download that file at that location. I was thinking an RDD would be perfect 
>> for this. I just want to hear from others who might have more experience in 
>> this. Below is the actual JSON structure that I am trying to use for the S3 
>> bucket and key values of each “record" within “Records".
>> 
>> {
>>    "Records":[
>>       {
>>          "eventVersion":"2.0",
>>          "eventSource":"aws:s3",
>>          "awsRegion":"us-east-1",
>>          "eventTime":The time, in ISO-8601 format, for example, 
>> 1970-01-01T00:00:00.000Z, when S3 finished processing the request,
>>          "eventName":"event-type",
>>          "userIdentity":{
>>             
>> "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
>>          },
>>          "requestParameters":{
>>             "sourceIPAddress":"ip-address-where-request-came-from"
>>          },
>>          "responseElements":{
>>             "x-amz-request-id":"Amazon S3 generated request ID",
>>             "x-amz-id-2":"Amazon S3 host that processed the request"
>>          },
>>          "s3":{
>>             "s3SchemaVersion":"1.0",
>>             "configurationId":"ID found in the bucket notification 
>> configuration",
>>             "bucket":{
>>                "name":"bucket-name",
>>                "ownerIdentity":{
>>                   "principalId":"Amazon-customer-ID-of-the-bucket-owner"
>>                },
>>                "arn":"bucket-ARN"
>>             },
>>             "object":{
>>                "key":"object-key",
>>                "size":object-size,
>>                "eTag":"object eTag",
>>                "versionId":"object version if bucket is versioning-enabled, 
>> otherwise null",
>>                "sequencer": "a string representation of a hexadecimal value 
>> used to determine event sequence,
>>                    only used with PUTs and DELETEs"
>>             }
>>          }
>>       },
>>       {
>>           // Additional events
>>       }
>>    ]
>> }
>> 
>> Thanks
>> Ben
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> <mailto:user-h...@spark.apache.org>
>> 
>> 
>> 
>> 
>> -- 
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau <https://twitter.com/holdenkarau>

Reply via email to