Hyukjin, This is what I did so far. I didn’t use DataSet yet or maybe I don’t need to.
var df: DataFrame = null for(message <- messages) { val bodyRdd = sc.parallelize(message.getBody() :: Nil) val fileDf = sqlContext.read.json(bodyRdd) .select( $"Records.s3.bucket.name".as("bucket"), $"Records.s3.object.key".as("key") ) if (df != null) { df = df.unionAll(fileDf) } else { df = fileDf } } df.show Each result is returned as an array. I just need to concatenate them together to make the S3 URL, and download the files per URL. This I need help with next. Thanks, Ben > On Apr 17, 2016, at 7:38 AM, Hyukjin Kwon <gurwls...@gmail.com> wrote: > > Hi! > > Personally, I don't think it necessarily needs to be DataSet for your goal. > > Just select your data at "s3" from DataFrame loaded by sqlContext.read.json(). > > You can try to printSchema() to check the nested schema and then select the > data. > > Also, I guess (from your codes) you are trying to send a reauest, fetch the > response to driver-side, and then send each message to executor-side. I guess > there would be really heavy overhead in driver-side. > Holden, > > If I were to use DataSets, then I would essentially do this: > > val receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl) > val messages = sqs.receiveMessage(receiveMessageRequest).getMessages() > for (message <- messages.asScala) { > val files = sqlContext.read.json(message.getBody()) > } > > Can I simply do files.toDS() or do I have to create a schema using a case > class File and apply it as[File]? If I have to apply a schema, then how would > I create it based on the JSON structure below, especially the nested elements. > > Thanks, > Ben > > >> On Apr 14, 2016, at 3:46 PM, Holden Karau <hol...@pigscanfly.ca >> <mailto:hol...@pigscanfly.ca>> wrote: >> >> You could certainly use RDDs for that, you might also find using Dataset >> selecting the fields you need to construct the URL to fetch and then using >> the map function to be easier. >> >> On Thu, Apr 14, 2016 at 12:01 PM, Benjamin Kim <bbuil...@gmail.com >> <mailto:bbuil...@gmail.com>> wrote: >> I was wonder what would be the best way to use JSON in Spark/Scala. I need >> to lookup values of fields in a collection of records to form a URL and >> download that file at that location. I was thinking an RDD would be perfect >> for this. I just want to hear from others who might have more experience in >> this. Below is the actual JSON structure that I am trying to use for the S3 >> bucket and key values of each “record" within “Records". >> >> { >> "Records":[ >> { >> "eventVersion":"2.0", >> "eventSource":"aws:s3", >> "awsRegion":"us-east-1", >> "eventTime":The time, in ISO-8601 format, for example, >> 1970-01-01T00:00:00.000Z, when S3 finished processing the request, >> "eventName":"event-type", >> "userIdentity":{ >> >> "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event" >> }, >> "requestParameters":{ >> "sourceIPAddress":"ip-address-where-request-came-from" >> }, >> "responseElements":{ >> "x-amz-request-id":"Amazon S3 generated request ID", >> "x-amz-id-2":"Amazon S3 host that processed the request" >> }, >> "s3":{ >> "s3SchemaVersion":"1.0", >> "configurationId":"ID found in the bucket notification >> configuration", >> "bucket":{ >> "name":"bucket-name", >> "ownerIdentity":{ >> "principalId":"Amazon-customer-ID-of-the-bucket-owner" >> }, >> "arn":"bucket-ARN" >> }, >> "object":{ >> "key":"object-key", >> "size":object-size, >> "eTag":"object eTag", >> "versionId":"object version if bucket is versioning-enabled, >> otherwise null", >> "sequencer": "a string representation of a hexadecimal value >> used to determine event sequence, >> only used with PUTs and DELETEs" >> } >> } >> }, >> { >> // Additional events >> } >> ] >> } >> >> Thanks >> Ben >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> For additional commands, e-mail: user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org> >> >> >> >> >> -- >> Cell : 425-233-8271 >> Twitter: https://twitter.com/holdenkarau <https://twitter.com/holdenkarau>