Re: S3 Zip File Loading Advice

2016-03-15 Thread Benjamin Kim
Hi Xinh,

I tried to wrap it, but it still didn’t work. I got a 
"java.util.ConcurrentModificationException”.

All,

I have been trying and trying with some help of a coworker, but it’s slow 
going. I have been able to gather a list of the s3 files I need to download.

### S3 Lists ###
import scala.collection.JavaConverters._
import java.util.ArrayList
import java.util.zip.{ZipEntry, ZipInputStream}
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.{ObjectListing, S3ObjectSummary, 
ListObjectsRequest, GetObjectRequest}
import org.apache.commons.io.IOUtils
import org.joda.time.{DateTime, Period}
import org.joda.time.format.DateTimeFormat

val s3Bucket = "amg-events"

val formatter = DateTimeFormat.forPattern("/MM/dd/HH")
var then = DateTime.now()

var files = new ArrayList[String]

//S3 Client and List Object Request
val s3Client = new AmazonS3Client()
val listObjectsRequest = new ListObjectsRequest()
var objectListing: ObjectListing = null

//Your S3 Bucket
listObjectsRequest.setBucketName(s3Bucket)

var now = DateTime.now()
var range = 
Iterator.iterate(now.minusDays(1))(_.plus(Period.hours(1))).takeWhile(!_.isAfter(now))
range.foreach(ymdh => {
  //Your Folder path or Prefix
  listObjectsRequest.setPrefix(formatter.print(ymdh))

  //Adding s3:// to the paths and adding to a list
  do {
objectListing = s3Client.listObjects(listObjectsRequest);
for (objectSummary <- objectListing.getObjectSummaries().asScala) {
  if (objectSummary.getKey().contains(".csv.zip") && 
objectSummary.getLastModified().after(then.toDate())) {
//files.add(objectSummary.getKey())
files.add("s3n://" + s3Bucket + "/" + objectSummary.getKey())
  }
}
listObjectsRequest.setMarker(objectListing.getNextMarker())
  } while (objectListing.isTruncated())
})
then = now

//Creating a Scala List for same
val fileList = files.asScala

//Parallelize the Scala List
val fileRDD = sc.parallelize(fileList)

Now, I am trying to go through the list and download each file, unzip each file 
as it comes, and pass the ZipInputStream to the CSV parser. This is where I get 
stuck.

var df: DataFrame = null
for (file <- fileList) {
  val zipfile = s3Client.getObject(new GetObjectRequest(s3Bucket, 
file)).getObjectContent()
  val zis = new ZipInputStream(zipfile)
  var ze = zis.getNextEntry()
//  val fileDf = 
sqlContext.read.format("com.databricks.spark.csv").option("header", 
"true").option("inferSchema", "true").load(zis)
//  if (df != null) {
//df = df.unionAll(fileDf)
//  } else {
//df = fileDf
//  }
}

I don’t know if I am doing it right or not. I also read that parallelizing 
fileList would allow parallel file retrieval. But, I don’t know how to proceed 
from here.

If you can help, I would be grateful.

Thanks,
Ben


> On Mar 9, 2016, at 10:10 AM, Xinh Huynh  wrote:
> 
> Could you wrap the ZipInputStream in a List, since a subtype of 
> TraversableOnce[?] is required?
> 
> case (name, content) => List(new ZipInputStream(content.open))
> 
> Xinh
> 
> On Wed, Mar 9, 2016 at 7:07 AM, Benjamin Kim  > wrote:
> Hi Sabarish,
> 
> I found a similar posting online where I should use the S3 listKeys. 
> http://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd
>  
> .
>  Is this what you were thinking?
> 
> And, your assumption is correct. The zipped CSV file contains only a single 
> file. I found this posting. 
> http://stackoverflow.com/questions/28969757/zip-support-in-apache-spark 
> . I 
> see how to do the unzipping, but I cannot get it to work when running the 
> code directly.
> 
> ...
> import java.io .{ IOException, FileOutputStream, 
> FileInputStream, File }
> import java.util.zip.{ ZipEntry, ZipInputStream }
> import org.apache.spark.input.PortableDataStream
> 
> sc.hadoopConfiguration.set("fs.s3n.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey)
> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey)
> 
> val zipFile = 
> "s3n://events/2016/03/01/00/event-20160301.00-4877ff81-928f-4da4-89b6-6d40a28d61c7.csv.zip
>  <>"
> val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name: String, 
> content: PortableDataStream) => new ZipInputStream(content.open) }
> 
> :95: error: type mismatch;
>  found   : java.util.zip.ZipInputStream
>  required: TraversableOnce[?]
>  val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name, 
> content) => new ZipInputStream(content.open) }
>   
>   ^
> 
> Thanks,
> Ben
> 
>> On Mar 9, 2016, at 12:03 AM, Sabarish Sasidharan > > wrote

Re: S3 Zip File Loading Advice

2016-03-09 Thread Xinh Huynh
Could you wrap the ZipInputStream in a List, since a subtype of
TraversableOnce[?] is required?

case (name, content) => List(new ZipInputStream(content.open))

Xinh

On Wed, Mar 9, 2016 at 7:07 AM, Benjamin Kim  wrote:

> Hi Sabarish,
>
> I found a similar posting online where I should use the S3 listKeys.
> http://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd.
> Is this what you were thinking?
>
> And, your assumption is correct. The zipped CSV file contains only a
> single file. I found this posting.
> http://stackoverflow.com/questions/28969757/zip-support-in-apache-spark.
> I see how to do the unzipping, but I cannot get it to work when running the
> code directly.
>
> ...
> import java.io.{ IOException, FileOutputStream, FileInputStream, File }
> import java.util.zip.{ ZipEntry, ZipInputStream }
> import org.apache.spark.input.PortableDataStream
>
>
> sc.hadoopConfiguration.set("fs.s3n.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey)
> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey)
>
> val zipFile = "
> s3n://events/2016/03/01/00/event-20160301.00-4877ff81-928f-4da4-89b6-6d40a28d61c7.csv.zip
> "
> val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name: String,
> content: PortableDataStream) => new ZipInputStream(content.open) }
>
> :95: error: type mismatch;
>  found   : java.util.zip.ZipInputStream
>  required: TraversableOnce[?]
>  val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name,
> content) => new ZipInputStream(content.open) }
>
>   ^
>
> Thanks,
> Ben
>
> On Mar 9, 2016, at 12:03 AM, Sabarish Sasidharan 
> wrote:
>
> You can use S3's listKeys API and do a diff between consecutive listKeys
> to identify what's new.
>
> Are there multiple files in each zip? Single file archives are processed
> just like text as long as it is one of the supported compression formats.
>
> Regards
> Sab
>
> On Wed, Mar 9, 2016 at 10:33 AM, Benjamin Kim  wrote:
>
>> I am wondering if anyone can help.
>>
>> Our company stores zipped CSV files in S3, which has been a big headache
>> from the start. I was wondering if anyone has created a way to iterate
>> through several subdirectories (s3n://events/2016/03/01/00,
>> s3n://2016/03/01/01, etc.) in S3 to find the newest files and load them.
>> It would be a big bonus to include the unzipping of the file in the process
>> so that the CSV can be loaded directly into a dataframe for further
>> processing. I’m pretty sure that the S3 part of this request is not
>> uncommon. I would think the file being zipped is uncommon. If anyone can
>> help, I would truly be grateful for I am new to Scala and Spark. This would
>> be a great help in learning.
>>
>> Thanks,
>> Ben
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: S3 Zip File Loading Advice

2016-03-09 Thread Benjamin Kim
Hi Sabarish,

I found a similar posting online where I should use the S3 listKeys. 
http://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd.
 Is this what you were thinking?

And, your assumption is correct. The zipped CSV file contains only a single 
file. I found this posting. 
http://stackoverflow.com/questions/28969757/zip-support-in-apache-spark. I see 
how to do the unzipping, but I cannot get it to work when running the code 
directly.

...
import java.io.{ IOException, FileOutputStream, FileInputStream, File }
import java.util.zip.{ ZipEntry, ZipInputStream }
import org.apache.spark.input.PortableDataStream

sc.hadoopConfiguration.set("fs.s3n.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey)

val zipFile = 
"s3n://events/2016/03/01/00/event-20160301.00-4877ff81-928f-4da4-89b6-6d40a28d61c7.csv.zip"
val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name: String, content: 
PortableDataStream) => new ZipInputStream(content.open) }

:95: error: type mismatch;
 found   : java.util.zip.ZipInputStream
 required: TraversableOnce[?]
 val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name, 
content) => new ZipInputStream(content.open) }

^

Thanks,
Ben

> On Mar 9, 2016, at 12:03 AM, Sabarish Sasidharan  
> wrote:
> 
> You can use S3's listKeys API and do a diff between consecutive listKeys to 
> identify what's new.
> 
> Are there multiple files in each zip? Single file archives are processed just 
> like text as long as it is one of the supported compression formats.
> 
> Regards
> Sab
> 
> On Wed, Mar 9, 2016 at 10:33 AM, Benjamin Kim  > wrote:
> I am wondering if anyone can help.
> 
> Our company stores zipped CSV files in S3, which has been a big headache from 
> the start. I was wondering if anyone has created a way to iterate through 
> several subdirectories (s3n://events/2016/03/01/00, s3n://2016/03/01/01, 
> etc.) in S3 to find the newest files and load them. It would be a big bonus 
> to include the unzipping of the file in the process so that the CSV can be 
> loaded directly into a dataframe for further processing. I’m pretty sure that 
> the S3 part of this request is not uncommon. I would think the file being 
> zipped is uncommon. If anyone can help, I would truly be grateful for I am 
> new to Scala and Spark. This would be a great help in learning.
> 
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: S3 Zip File Loading Advice

2016-03-09 Thread Jörn Franke

Oozie may be able to do this for you and integrate with Spark.

> On 09 Mar 2016, at 06:03, Benjamin Kim  wrote:
> 
> I am wondering if anyone can help.
> 
> Our company stores zipped CSV files in S3, which has been a big headache from 
> the start. I was wondering if anyone has created a way to iterate through 
> several subdirectories (s3n://events/2016/03/01/00, s3n://2016/03/01/01, 
> etc.) in S3 to find the newest files and load them. It would be a big bonus 
> to include the unzipping of the file in the process so that the CSV can be 
> loaded directly into a dataframe for further processing. I’m pretty sure that 
> the S3 part of this request is not uncommon. I would think the file being 
> zipped is uncommon. If anyone can help, I would truly be grateful for I am 
> new to Scala and Spark. This would be a great help in learning.
> 
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: S3 Zip File Loading Advice

2016-03-09 Thread Sabarish Sasidharan
You can use S3's listKeys API and do a diff between consecutive listKeys to
identify what's new.

Are there multiple files in each zip? Single file archives are processed
just like text as long as it is one of the supported compression formats.

Regards
Sab

On Wed, Mar 9, 2016 at 10:33 AM, Benjamin Kim  wrote:

> I am wondering if anyone can help.
>
> Our company stores zipped CSV files in S3, which has been a big headache
> from the start. I was wondering if anyone has created a way to iterate
> through several subdirectories (s3n://events/2016/03/01/00,
> s3n://2016/03/01/01, etc.) in S3 to find the newest files and load them. It
> would be a big bonus to include the unzipping of the file in the process so
> that the CSV can be loaded directly into a dataframe for further
> processing. I’m pretty sure that the S3 part of this request is not
> uncommon. I would think the file being zipped is uncommon. If anyone can
> help, I would truly be grateful for I am new to Scala and Spark. This would
> be a great help in learning.
>
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: S3 Zip File Loading Advice

2016-03-08 Thread Hemant Bhanawat
https://issues.apache.org/jira/browse/SPARK-3586 talks about creating a
file dstream which can monitor for new files recursively but this
functionality is not yet added.

I don't see an easy way out. You will have to create your folders based on
timeline (looks like you are already doing that) and running a new job over
the new folders created in an interval.  This will have to be an automated
using an external script.

Hemant Bhanawat 
www.snappydata.io

On Wed, Mar 9, 2016 at 10:33 AM, Benjamin Kim  wrote:

> I am wondering if anyone can help.
>
> Our company stores zipped CSV files in S3, which has been a big headache
> from the start. I was wondering if anyone has created a way to iterate
> through several subdirectories (s3n://events/2016/03/01/00,
> s3n://2016/03/01/01, etc.) in S3 to find the newest files and load them. It
> would be a big bonus to include the unzipping of the file in the process so
> that the CSV can be loaded directly into a dataframe for further
> processing. I’m pretty sure that the S3 part of this request is not
> uncommon. I would think the file being zipped is uncommon. If anyone can
> help, I would truly be grateful for I am new to Scala and Spark. This would
> be a great help in learning.
>
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


S3 Zip File Loading Advice

2016-03-08 Thread Benjamin Kim
I am wondering if anyone can help.

Our company stores zipped CSV files in S3, which has been a big headache from 
the start. I was wondering if anyone has created a way to iterate through 
several subdirectories (s3n://events/2016/03/01/00, s3n://2016/03/01/01, etc.) 
in S3 to find the newest files and load them. It would be a big bonus to 
include the unzipping of the file in the process so that the CSV can be loaded 
directly into a dataframe for further processing. I’m pretty sure that the S3 
part of this request is not uncommon. I would think the file being zipped is 
uncommon. If anyone can help, I would truly be grateful for I am new to Scala 
and Spark. This would be a great help in learning.

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org