Hi Xinh,

I tried to wrap it, but it still didn’t work. I got a 
"java.util.ConcurrentModificationException”.

All,

I have been trying and trying with some help of a coworker, but it’s slow 
going. I have been able to gather a list of the s3 files I need to download.

### S3 Lists ###
import scala.collection.JavaConverters._
import java.util.ArrayList
import java.util.zip.{ZipEntry, ZipInputStream}
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.{ObjectListing, S3ObjectSummary, 
ListObjectsRequest, GetObjectRequest}
import org.apache.commons.io.IOUtils
import org.joda.time.{DateTime, Period}
import org.joda.time.format.DateTimeFormat

val s3Bucket = "amg-events"

val formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH")
var then = DateTime.now()

var files = new ArrayList[String]

//S3 Client and List Object Request
val s3Client = new AmazonS3Client()
val listObjectsRequest = new ListObjectsRequest()
var objectListing: ObjectListing = null

//Your S3 Bucket
listObjectsRequest.setBucketName(s3Bucket)

var now = DateTime.now()
var range = 
Iterator.iterate(now.minusDays(1))(_.plus(Period.hours(1))).takeWhile(!_.isAfter(now))
range.foreach(ymdh => {
  //Your Folder path or Prefix
  listObjectsRequest.setPrefix(formatter.print(ymdh))

  //Adding s3:// to the paths and adding to a list
  do {
    objectListing = s3Client.listObjects(listObjectsRequest);
    for (objectSummary <- objectListing.getObjectSummaries().asScala) {
      if (objectSummary.getKey().contains(".csv.zip") && 
objectSummary.getLastModified().after(then.toDate())) {
//        files.add(objectSummary.getKey())
        files.add("s3n://" + s3Bucket + "/" + objectSummary.getKey())
      }
    }
    listObjectsRequest.setMarker(objectListing.getNextMarker())
  } while (objectListing.isTruncated())
})
then = now

//Creating a Scala List for same
val fileList = files.asScala

//Parallelize the Scala List
val fileRDD = sc.parallelize(fileList)

Now, I am trying to go through the list and download each file, unzip each file 
as it comes, and pass the ZipInputStream to the CSV parser. This is where I get 
stuck.

var df: DataFrame = null
for (file <- fileList) {
  val zipfile = s3Client.getObject(new GetObjectRequest(s3Bucket, 
file)).getObjectContent()
  val zis = new ZipInputStream(zipfile)
  var ze = zis.getNextEntry()
//  val fileDf = 
sqlContext.read.format("com.databricks.spark.csv").option("header", 
"true").option("inferSchema", "true").load(zis)
//  if (df != null) {
//    df = df.unionAll(fileDf)
//  } else {
//    df = fileDf
//  }
}

I don’t know if I am doing it right or not. I also read that parallelizing 
fileList would allow parallel file retrieval. But, I don’t know how to proceed 
from here.

If you can help, I would be grateful.

Thanks,
Ben


> On Mar 9, 2016, at 10:10 AM, Xinh Huynh <xinh.hu...@gmail.com> wrote:
> 
> Could you wrap the ZipInputStream in a List, since a subtype of 
> TraversableOnce[?] is required?
> 
> case (name, content) => List(new ZipInputStream(content.open))
> 
> Xinh
> 
> On Wed, Mar 9, 2016 at 7:07 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Sabarish,
> 
> I found a similar posting online where I should use the S3 listKeys. 
> http://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd
>  
> <http://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd>.
>  Is this what you were thinking?
> 
> And, your assumption is correct. The zipped CSV file contains only a single 
> file. I found this posting. 
> http://stackoverflow.com/questions/28969757/zip-support-in-apache-spark 
> <http://stackoverflow.com/questions/28969757/zip-support-in-apache-spark>. I 
> see how to do the unzipping, but I cannot get it to work when running the 
> code directly.
> 
> ...
> import java.io <http://java.io/>.{ IOException, FileOutputStream, 
> FileInputStream, File }
> import java.util.zip.{ ZipEntry, ZipInputStream }
> import org.apache.spark.input.PortableDataStream
> 
> sc.hadoopConfiguration.set("fs.s3n.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey)
> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey)
> 
> val zipFile = 
> "s3n://events/2016/03/01/00/event-20160301.000000-4877ff81-928f-4da4-89b6-6d40a28d61c7.csv.zip
>  <>"
> val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name: String, 
> content: PortableDataStream) => new ZipInputStream(content.open) }
> 
> <console>:95: error: type mismatch;
>  found   : java.util.zip.ZipInputStream
>  required: TraversableOnce[?]
>          val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name, 
> content) => new ZipInputStream(content.open) }
>                                                                               
>                                                   ^
> 
> Thanks,
> Ben
> 
>> On Mar 9, 2016, at 12:03 AM, Sabarish Sasidharan <sabarish....@gmail.com 
>> <mailto:sabarish....@gmail.com>> wrote:
>> 
>> You can use S3's listKeys API and do a diff between consecutive listKeys to 
>> identify what's new.
>> 
>> Are there multiple files in each zip? Single file archives are processed 
>> just like text as long as it is one of the supported compression formats.
>> 
>> Regards
>> Sab
>> 
>> On Wed, Mar 9, 2016 at 10:33 AM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> I am wondering if anyone can help.
>> 
>> Our company stores zipped CSV files in S3, which has been a big headache 
>> from the start. I was wondering if anyone has created a way to iterate 
>> through several subdirectories (s3n://events/2016/03/01/00 <>, 
>> s3n://2016/03/01/01 <>, etc.) in S3 to find the newest files and load them. 
>> It would be a big bonus to include the unzipping of the file in the process 
>> so that the CSV can be loaded directly into a dataframe for further 
>> processing. I’m pretty sure that the S3 part of this request is not 
>> uncommon. I would think the file being zipped is uncommon. If anyone can 
>> help, I would truly be grateful for I am new to Scala and Spark. This would 
>> be a great help in learning.
>> 
>> Thanks,
>> Ben
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> <mailto:user-h...@spark.apache.org>
>> 
>> 
> 
> 

Reply via email to