Hi Xinh, I tried to wrap it, but it still didn’t work. I got a "java.util.ConcurrentModificationException”.
All, I have been trying and trying with some help of a coworker, but it’s slow going. I have been able to gather a list of the s3 files I need to download. ### S3 Lists ### import scala.collection.JavaConverters._ import java.util.ArrayList import java.util.zip.{ZipEntry, ZipInputStream} import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.s3.model.{ObjectListing, S3ObjectSummary, ListObjectsRequest, GetObjectRequest} import org.apache.commons.io.IOUtils import org.joda.time.{DateTime, Period} import org.joda.time.format.DateTimeFormat val s3Bucket = "amg-events" val formatter = DateTimeFormat.forPattern("yyyy/MM/dd/HH") var then = DateTime.now() var files = new ArrayList[String] //S3 Client and List Object Request val s3Client = new AmazonS3Client() val listObjectsRequest = new ListObjectsRequest() var objectListing: ObjectListing = null //Your S3 Bucket listObjectsRequest.setBucketName(s3Bucket) var now = DateTime.now() var range = Iterator.iterate(now.minusDays(1))(_.plus(Period.hours(1))).takeWhile(!_.isAfter(now)) range.foreach(ymdh => { //Your Folder path or Prefix listObjectsRequest.setPrefix(formatter.print(ymdh)) //Adding s3:// to the paths and adding to a list do { objectListing = s3Client.listObjects(listObjectsRequest); for (objectSummary <- objectListing.getObjectSummaries().asScala) { if (objectSummary.getKey().contains(".csv.zip") && objectSummary.getLastModified().after(then.toDate())) { // files.add(objectSummary.getKey()) files.add("s3n://" + s3Bucket + "/" + objectSummary.getKey()) } } listObjectsRequest.setMarker(objectListing.getNextMarker()) } while (objectListing.isTruncated()) }) then = now //Creating a Scala List for same val fileList = files.asScala //Parallelize the Scala List val fileRDD = sc.parallelize(fileList) Now, I am trying to go through the list and download each file, unzip each file as it comes, and pass the ZipInputStream to the CSV parser. This is where I get stuck. var df: DataFrame = null for (file <- fileList) { val zipfile = s3Client.getObject(new GetObjectRequest(s3Bucket, file)).getObjectContent() val zis = new ZipInputStream(zipfile) var ze = zis.getNextEntry() // val fileDf = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(zis) // if (df != null) { // df = df.unionAll(fileDf) // } else { // df = fileDf // } } I don’t know if I am doing it right or not. I also read that parallelizing fileList would allow parallel file retrieval. But, I don’t know how to proceed from here. If you can help, I would be grateful. Thanks, Ben > On Mar 9, 2016, at 10:10 AM, Xinh Huynh <xinh.hu...@gmail.com> wrote: > > Could you wrap the ZipInputStream in a List, since a subtype of > TraversableOnce[?] is required? > > case (name, content) => List(new ZipInputStream(content.open)) > > Xinh > > On Wed, Mar 9, 2016 at 7:07 AM, Benjamin Kim <bbuil...@gmail.com > <mailto:bbuil...@gmail.com>> wrote: > Hi Sabarish, > > I found a similar posting online where I should use the S3 listKeys. > http://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd > > <http://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd>. > Is this what you were thinking? > > And, your assumption is correct. The zipped CSV file contains only a single > file. I found this posting. > http://stackoverflow.com/questions/28969757/zip-support-in-apache-spark > <http://stackoverflow.com/questions/28969757/zip-support-in-apache-spark>. I > see how to do the unzipping, but I cannot get it to work when running the > code directly. > > ... > import java.io <http://java.io/>.{ IOException, FileOutputStream, > FileInputStream, File } > import java.util.zip.{ ZipEntry, ZipInputStream } > import org.apache.spark.input.PortableDataStream > > sc.hadoopConfiguration.set("fs.s3n.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem") > sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey) > sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey) > > val zipFile = > "s3n://events/2016/03/01/00/event-20160301.000000-4877ff81-928f-4da4-89b6-6d40a28d61c7.csv.zip > <>" > val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name: String, > content: PortableDataStream) => new ZipInputStream(content.open) } > > <console>:95: error: type mismatch; > found : java.util.zip.ZipInputStream > required: TraversableOnce[?] > val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name, > content) => new ZipInputStream(content.open) } > > ^ > > Thanks, > Ben > >> On Mar 9, 2016, at 12:03 AM, Sabarish Sasidharan <sabarish....@gmail.com >> <mailto:sabarish....@gmail.com>> wrote: >> >> You can use S3's listKeys API and do a diff between consecutive listKeys to >> identify what's new. >> >> Are there multiple files in each zip? Single file archives are processed >> just like text as long as it is one of the supported compression formats. >> >> Regards >> Sab >> >> On Wed, Mar 9, 2016 at 10:33 AM, Benjamin Kim <bbuil...@gmail.com >> <mailto:bbuil...@gmail.com>> wrote: >> I am wondering if anyone can help. >> >> Our company stores zipped CSV files in S3, which has been a big headache >> from the start. I was wondering if anyone has created a way to iterate >> through several subdirectories (s3n://events/2016/03/01/00 <>, >> s3n://2016/03/01/01 <>, etc.) in S3 to find the newest files and load them. >> It would be a big bonus to include the unzipping of the file in the process >> so that the CSV can be loaded directly into a dataframe for further >> processing. I’m pretty sure that the S3 part of this request is not >> uncommon. I would think the file being zipped is uncommon. If anyone can >> help, I would truly be grateful for I am new to Scala and Spark. This would >> be a great help in learning. >> >> Thanks, >> Ben >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> For additional commands, e-mail: user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org> >> >> > >