S3 Reader / Writer for apache streams
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/544a0c92 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/544a0c92 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/544a0c92 Branch: refs/heads/master Commit: 544a0c92ecfa1ab7e1334721abeeb881cd25cd3f Parents: 741a454 Author: Matthew Hager <matthew.ha...@gmail.com> Authored: Fri May 2 13:03:50 2014 -0500 Committer: Matthew Hager <matthew.ha...@gmail.com> Committed: Fri May 2 13:04:06 2014 -0500 ---------------------------------------------------------------------- streams-contrib/pom.xml | 1 + streams-contrib/streams-amazon-aws/pom.xml | 67 +++++ .../streams-persist-s3/pom.xml | 87 +++++++ .../org/apache/streams/s3/S3Configurator.java | 64 +++++ .../streams/s3/S3ObjectInputStreamWrapper.java | 111 ++++++++ .../streams/s3/S3OutputStreamWrapper.java | 128 +++++++++ .../org/apache/streams/s3/S3PersistReader.java | 141 ++++++++++ .../apache/streams/s3/S3PersistReaderTask.java | 87 +++++++ .../org/apache/streams/s3/S3PersistWriter.java | 257 +++++++++++++++++++ .../apache/streams/s3/S3PersistWriterTask.java | 37 +++ .../org/apache/streams/s3/S3Configuration.json | 25 ++ .../streams/s3/S3ReaderConfiguration.json | 14 + .../streams/s3/S3WriterConfiguration.json | 28 ++ 13 files changed, 1047 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index d80fc63..c7bbdf4 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -44,6 +44,7 @@ <module>streams-persist-hdfs</module> <module>streams-persist-kafka</module> <module>streams-persist-mongo</module> + <module>streams-amazon-aws</module> <!--<module>streams-processor-lucene</module>--> <!--<module>streams-processor-tika</module>--> <module>streams-processor-urls</module> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/pom.xml b/streams-contrib/streams-amazon-aws/pom.xml new file mode 100644 index 0000000..57a67cb --- /dev/null +++ b/streams-contrib/streams-amazon-aws/pom.xml @@ -0,0 +1,67 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>streams-contrib</artifactId> + <groupId>org.apache.streams</groupId> + <version>0.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>streams-amazon-aws</artifactId> + + <packaging>pom</packaging> + <name>streams-amazon-aws</name> + + <properties> + + </properties> + + <modules> + <module>streams-persist-s3</module> + </modules> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk</artifactId> + <version>1.7.5</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </dependencyManagement> +</project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml new file mode 100644 index 0000000..4e9b9b1 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml @@ -0,0 +1,87 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>streams-amazon-aws</artifactId> + <groupId>org.apache.streams</groupId> + <version>0.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>streams-persist-s3</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-util</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.8</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/jsonschema2pojo</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-maven-plugin</artifactId> + <configuration> + <addCompileSourceRoot>true</addCompileSourceRoot> + <generateBuilders>true</generateBuilders> + <sourcePaths> + <sourcePath>src/main/jsonschema/org/apache/streams/s3/S3Configuration.json</sourcePath> + <sourcePath>src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json</sourcePath> + <sourcePath>src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json</sourcePath> + </sourcePaths> + <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory> + <targetPackage>org.apache.streams.s3.pojo</targetPackage> + <useLongIntegers>true</useLongIntegers> + <useJodaDates>true</useJodaDates> + </configuration> + <executions> + <execution> + <goals> + <goal>generate</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java new file mode 100644 index 0000000..8190404 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java @@ -0,0 +1,64 @@ +package org.apache.streams.s3; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3Configurator { + + private final static Logger LOGGER = LoggerFactory.getLogger(S3Configurator.class); + + private final static ObjectMapper mapper = new ObjectMapper(); + + public static S3Configuration detectConfiguration(Config s3) { + + S3Configuration s3Configuration = new S3Configuration(); + + s3Configuration.setBucket(s3.getString("bucket")); + s3Configuration.setKey(s3.getString("key")); + s3Configuration.setSecretKey(s3.getString("secretKey")); + + // The Amazon S3 Library defaults to HTTPS + String protocol = (!s3.hasPath("protocol") ? "https": s3.getString("protocol")).toLowerCase(); + + if(!(protocol.equals("https") || protocol.equals("http"))) { + // you must specify either HTTP or HTTPS + } + + s3Configuration.setProtocol(protocol.toLowerCase()); + + return s3Configuration; + } + + public static S3ReaderConfiguration detectReaderConfiguration(Config s3) { + + S3Configuration S3Configuration = detectConfiguration(s3); + S3ReaderConfiguration s3ReaderConfiguration = mapper.convertValue(S3Configuration, S3ReaderConfiguration.class); + + s3ReaderConfiguration.setReaderPath(s3.getString("readerPath")); + + return s3ReaderConfiguration; + } + + public static S3WriterConfiguration detectWriterConfiguration(Config s3) { + + S3Configuration s3Configuration = detectConfiguration(s3); + S3WriterConfiguration s3WriterConfiguration = mapper.convertValue(s3Configuration, S3WriterConfiguration.class); + + String rootPath = s3.getString("writerPath"); + + // if the root path doesn't end in a '/' then we need to force the '/' at the end of the path. + s3WriterConfiguration.setWriterPath(rootPath + (rootPath.endsWith("/") ? "" : "/")); + + s3WriterConfiguration.setWriterFilePrefix(s3.hasPath("writerFilePrefix") ? s3.getString("writerFilePrefix") : "default"); + + if(s3.hasPath("maxFileSize")) + s3WriterConfiguration.setMaxFileSize((long)s3.getInt("maxFileSize")); + if(s3.hasPath("chunk")) + s3WriterConfiguration.setChunk(s3.getBoolean("chunk")); + + return s3WriterConfiguration; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java new file mode 100644 index 0000000..2a2dba0 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java @@ -0,0 +1,111 @@ +package org.apache.streams.s3; + +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; + +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; + +/** + * There is a stupid nuance associated with reading portions of files in S3. Everything occurs over + * an Apache HTTP client object. Apache defaults to re-using the stream. So, if you only want to read + * a small portion of the file. You must first "abort" the stream, then close. Otherwise, Apache will + * exhaust the stream and transfer a ton of data attempting to do so. + * + * + * Author Smashew + * Date 2014-04-11 + * + * After a few more days and some demos that had some issues with concurrency and high user load. This + * was also discovered. There is an issue with the S3Object's HTTP connection not being released back + * to the connection pool (until it times out) even once the item is garbage collected. So.... + * + * Reference: + * http://stackoverflow.com/questions/17782937/connectionpooltimeoutexception-when-iterating-objects-in-s3 + */ +public class S3ObjectInputStreamWrapper extends InputStream +{ + private final static Logger LOGGER = LoggerFactory.getLogger(S3ObjectInputStreamWrapper.class); + + private final S3Object s3Object; + private final S3ObjectInputStream is; + private boolean isClosed = false; + + public S3ObjectInputStreamWrapper(S3Object s3Object) { + this.s3Object = s3Object; + this.is = this.s3Object.getObjectContent(); + } + + public int hashCode() { return this.is.hashCode(); } + public boolean equals(Object obj) { return this.is.equals(obj); } + public String toString() { return this.is.toString(); } + public int read() throws IOException { return this.is.read(); } + public int read(byte[] b) throws IOException { return this.is.read(b); } + public int read(byte[] b, int off, int len) throws IOException { return this.is.read(b, off, len); } + public long skip(long n) throws IOException { return this.is.skip(n); } + public int available() throws IOException { return this.is.available(); } + public boolean markSupported() { return this.is.markSupported(); } + public synchronized void mark(int readlimit) { this.is.mark(readlimit); } + public synchronized void reset() throws IOException { this.is.reset(); } + + public void close() throws IOException { + ensureEverythingIsReleased(); + } + + public void ensureEverythingIsReleased() + { + if(this.isClosed) + return; + + // THIS WHOLE CLASS IS JUST FOR THIS FUNCTION! + // Amazon S3 - HTTP Exhaust all file contents issue + try { + this.is.abort(); + } + catch(Exception e) { + LOGGER.warn("S3Object[{}]: Issue Aborting Stream - {}", s3Object.getKey(), e.getMessage()); + } + + // close the input Stream Safely + closeSafely(this.is); + + // This corrects the issue with Open HTTP connections + closeSafely(this.s3Object); + this.isClosed = true; + } + + private static void closeSafely(Closeable is) { + try { + if(is != null) + is.close(); + } catch(Exception e) { + e.printStackTrace(); + LOGGER.warn("S3InputStreamWrapper: Issue Closing Closeable - {}", e.getMessage()); + } + } + + protected void finalize( ) throws Throwable + { + try + { + ensureEverythingIsReleased(); + super.finalize(); + } catch(Exception e) { + // this should never be called, just being very cautious + LOGGER.warn("S3InputStreamWrapper: Issue Releasing Connections on Finalize - {}", e.getMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java new file mode 100644 index 0000000..8f55983 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java @@ -0,0 +1,128 @@ +package org.apache.streams.s3; + +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.Upload; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; + +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.Upload; +import org.apache.commons.io.FilenameUtils; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Date; +import java.util.Map; + +/** + * + * Author: Smashew + * Date: 2014-04-14 + * + * Description: + * This class uses ByteArrayOutputStreams to ensure files are written to S3 properly. + * + * There is a way to upload data in chunks (5mb or so) a peice, but the multi-part upload + * is kind of a PITA to deal with. + * + * // TODO: This should be refactored to allow a user to specify if they want one large file instead of many small ones + */ +public class S3OutputStreamWrapper extends OutputStream +{ + private static final Logger LOGGER = LoggerFactory.getLogger(S3OutputStreamWrapper.class); + + private final AmazonS3Client amazonS3Client; + private final String bucketName; + private final String path; + private final String fileName; + private ByteArrayOutputStream outputStream; + private final Map<String, String> metaData; + + private boolean isClosed = false; + + public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException + { + this.amazonS3Client = amazonS3Client; + this.bucketName = bucketName; + this.path = path; + this.fileName = fileName; + this.metaData = metaData; + this.outputStream = new ByteArrayOutputStream(); + } + + /* + * The Methods that are overriden to support the 'OutputStream' object. + */ + + public void write(int b) throws IOException { this.outputStream.write(b); } + public void write(byte[] b) throws IOException { this.outputStream.write(b); } + public void write(byte[] b, int off, int len) throws IOException { this.outputStream.write(b, off, len); } + public void flush() throws IOException { this.outputStream.flush(); } + + /** + * Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3. + * @throws IOException + * Exception thrown from the FileOutputStream + */ + public void close() throws IOException { + if(!isClosed) + { + try + { + this.addFile(); + this.outputStream.close(); + this.outputStream = null; + } + catch(Exception e) { + e.printStackTrace(); + LOGGER.warn("There was an error adding the temporaryFile to S3"); + } + finally { + // we are done here. + this.isClosed = true; + } + } + } + + private void addFile() throws Exception { + + InputStream is = new ByteArrayInputStream(this.outputStream.toByteArray()); + int contentLength = outputStream.size(); + + TransferManager transferManager = new TransferManager(amazonS3Client); + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setExpirationTime(DateTime.now().plusDays(365*3).toDate()); + metadata.setContentLength(contentLength); + + metadata.addUserMetadata("writer", "org.apache.streams"); + + for(String s : metaData.keySet()) + metadata.addUserMetadata(s, metaData.get(s)); + + String fileNameToWrite = path + fileName; + Upload upload = transferManager.upload(bucketName, fileNameToWrite, is, metadata); + try { + upload.waitForUploadResult(); + + is.close(); + transferManager.shutdownNow(false); + LOGGER.info("S3 File Close[{} kb] - {}", contentLength / 1024, path + fileName); + } catch (Exception e) { + // No Op + } + + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java new file mode 100644 index 0000000..a987a47 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java @@ -0,0 +1,141 @@ +package org.apache.streams.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Queues; +import org.apache.streams.core.*; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigInteger; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +public class S3PersistReader implements StreamsPersistReader, DatumStatusCountable { + + private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistReader.class); + public final static String STREAMS_ID = "S3PersistReader"; + protected final static char DELIMITER = '\t'; + + private S3ReaderConfiguration s3ReaderConfiguration; + private AmazonS3Client amazonS3Client; + private ObjectMapper mapper = new ObjectMapper(); + private Collection<String> files; + private ExecutorService executor; + protected volatile Queue<StreamsDatum> persistQueue; + + protected DatumStatusCounter countersTotal = new DatumStatusCounter(); + protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); + + public AmazonS3Client getAmazonS3Client() { return this.amazonS3Client; } + public S3ReaderConfiguration getS3ReaderConfiguration() { return this.s3ReaderConfiguration; } + public String getBucketName() { return this.s3ReaderConfiguration.getBucket(); } + public StreamsResultSet readNew(BigInteger sequence) { return null; } + public StreamsResultSet readRange(DateTime start, DateTime end) { return null; } + public DatumStatusCounter getDatumStatusCounter() { return countersTotal; } + public Collection<String> getFiles() { return this.files; } + + public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) { + this.s3ReaderConfiguration = s3ReaderConfiguration; + } + + public void prepare(Object configurationObject) { + // Connect to S3 + synchronized (this) + { + // Create the credentials Object + AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey()); + + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toUpperCase())); + + // We want path style access + S3ClientOptions clientOptions = new S3ClientOptions(); + clientOptions.setPathStyleAccess(true); + + this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); + this.amazonS3Client.setS3ClientOptions(clientOptions); + } + + final ListObjectsRequest request = new ListObjectsRequest() + .withBucketName(this.s3ReaderConfiguration.getBucket()) + .withPrefix(s3ReaderConfiguration.getReaderPath()) + .withMaxKeys(50); + + + ObjectListing listing = this.amazonS3Client.listObjects(request); + + this.files = new ArrayList<String>(); + + /** + * If you can list files that are in this path, then you must be dealing with a directory + * if you cannot list files that are in this path, then you are most likely dealing with + * a simple file. + */ + if(listing.getCommonPrefixes().size() > 0) { + // Handle the 'directory' use case + do + { + for (String file : listing.getCommonPrefixes()) + this.files.add(file); + + // get the next batch. + listing = this.amazonS3Client.listNextBatchOfObjects(listing); + } while (listing.isTruncated()); + } + else { + // handle the single file use-case + this.files.add(s3ReaderConfiguration.getReaderPath()); + } + + if(this.files.size() <= 0) + LOGGER.error("There are no files to read"); + + this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000)); + this.executor = Executors.newSingleThreadExecutor(); + } + + public void cleanUp() { } + + public StreamsResultSet readAll() { + startStream(); + return new StreamsResultSet(persistQueue); + } + + public void startStream() { + LOGGER.debug("startStream"); + executor.submit(new S3PersistReaderTask(this)); + } + + public StreamsResultSet readCurrent() { + + StreamsResultSet current; + + synchronized( S3PersistReader.class ) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); + persistQueue.clear(); + } + return current; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java new file mode 100644 index 0000000..70015fb --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java @@ -0,0 +1,87 @@ +package org.apache.streams.s3; + +import com.google.common.base.Strings; +import org.apache.streams.core.DatumStatus; +import org.apache.streams.core.StreamsDatum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.InputStreamReader; + +public class S3PersistReaderTask implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReaderTask.class); + + private S3PersistReader reader; + + public S3PersistReaderTask(S3PersistReader reader) { + this.reader = reader; + } + + @Override + public void run() { + + for(String file : reader.getFiles()) + { + // Create our buffered reader + + S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file)); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); + LOGGER.info("Reading: {} ", file); + + String line = ""; + try { + while((line = bufferedReader.readLine()) != null) + { + if( !Strings.isNullOrEmpty(line) ) + { + reader.countersCurrent.incrementAttempt(); + String[] fields = line.split(Character.toString(reader.DELIMITER)); + StreamsDatum entry = new StreamsDatum(fields[3], fields[0]); + write( entry ); + reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS); + } + } + } + catch (Exception e) + { + e.printStackTrace(); + LOGGER.warn(e.getMessage()); + reader.countersCurrent.incrementStatus(DatumStatus.FAIL); + } + + LOGGER.info("Completed: " + file); + + try { + closeSafely(file, is); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.error(e.getMessage()); + } + } + } + + private static void closeSafely(String file, Closeable closeable) { + try { + closeable.close(); + } + catch(Exception e) { + LOGGER.error("There was an issue closing file: {}", file); + } + } + + + private void write( StreamsDatum entry ) { + boolean success; + do { + synchronized( S3PersistReader.class ) { + success = reader.persistQueue.offer(entry); + } + Thread.yield(); + } + while( !success ); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java new file mode 100644 index 0000000..c46ff03 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java @@ -0,0 +1,257 @@ +package org.apache.streams.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.AtomicDouble; +import org.apache.streams.core.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountable +{ + public final static String STREAMS_ID = "S3PersistWriter"; + + private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistWriter.class); + + private final static char DELIMITER = '\t'; + + private ObjectMapper objectMapper; + private AmazonS3Client amazonS3Client; + private S3WriterConfiguration s3WriterConfiguration; + private final List<String> writtenFiles = new ArrayList<String>(); + + private final AtomicLong totalBytesWritten = new AtomicLong(); + private AtomicLong bytesWrittenThisFile = new AtomicLong(); + + private final AtomicInteger totalRecordsWritten = new AtomicInteger(); + private AtomicInteger fileLineCounter = new AtomicInteger(); + + private Map<String, String> objectMetaData = new HashMap<String, String>() {{ + put("line[0]", "id"); + put("line[1]", "timeStamp"); + put("line[2]", "metaData"); + put("line[3]", "document"); + }}; + + private OutputStreamWriter currentWriter = null; + protected volatile Queue<StreamsDatum> persistQueue; + + public AmazonS3Client getAmazonS3Client() { return this.amazonS3Client; } + public S3WriterConfiguration getS3WriterConfiguration() { return this.s3WriterConfiguration; } + public List<String> getWrittenFiles() { return this.writtenFiles; } + public Map<String, String> getObjectMetaData() { return this.objectMetaData; } + public ObjectMapper getObjectMapper() { return this.objectMapper; } + + public void setObjectMapper(ObjectMapper mapper) { this.objectMapper = mapper; } + public void setObjectMetaData(Map<String, String> val) { this.objectMetaData = val; } + + /** + * Instantiator with a pre-existing amazonS3Client, this is used to help with re-use. + * @param amazonS3Client + * If you have an existing amazonS3Client, it wont' bother to create another one + * @param s3WriterConfiguration + * Configuration of the write paths and instructions are still required. + */ + public S3PersistWriter(AmazonS3Client amazonS3Client, S3WriterConfiguration s3WriterConfiguration) { + this.amazonS3Client = amazonS3Client; + this.s3WriterConfiguration = s3WriterConfiguration; + } + + public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) { + this.s3WriterConfiguration = s3WriterConfiguration; + } + + @Override + public void write(StreamsDatum streamsDatum) { + + synchronized (this) + { + // Check to see if we need to reset the file that we are currently working with + if (this.currentWriter == null || ( this.bytesWrittenThisFile.get() >= (this.s3WriterConfiguration.getMaxFileSize() * 1024 * 1024))) { + try { + LOGGER.info("Resetting the file"); + this.currentWriter = resetFile(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + + String line = convertResultToString(streamsDatum); + + try { + this.currentWriter.write(line); + } catch (IOException e) { + e.printStackTrace(); + } + + // add the bytes we've written + int recordSize = line.getBytes().length; + this.totalBytesWritten.addAndGet(recordSize); + this.bytesWrittenThisFile.addAndGet(recordSize); + + // increment the record count + this.totalRecordsWritten.incrementAndGet(); + this.fileLineCounter.incrementAndGet(); + } + + } + + private synchronized OutputStreamWriter resetFile() throws Exception + { + // this will keep it thread safe, so we don't create too many files + if(this.fileLineCounter.get() == 0 && this.currentWriter != null) + return this.currentWriter; + + closeAndDestroyWriter(); + + // Create the path for where the file is going to live. + try + { + // generate a file name + String fileName = this.s3WriterConfiguration.getWriterFilePrefix() + + (this.s3WriterConfiguration.getChunk() ? "/" : "-") + new Date().getTime() + ".tsv"; + + // create the output stream + OutputStream outputStream = new S3OutputStreamWrapper(this.amazonS3Client, + this.s3WriterConfiguration.getBucket(), + this.s3WriterConfiguration.getWriterPath(), + fileName, + this.objectMetaData); + + // reset the counter + this.fileLineCounter = new AtomicInteger(); + this.bytesWrittenThisFile = new AtomicLong(); + + // add this to the list of written files + writtenFiles.add(this.s3WriterConfiguration.getWriterPath() + fileName); + + // Log that we are creating this file + LOGGER.info("File Created: Bucket[{}] - {}", this.s3WriterConfiguration.getBucket(), this.s3WriterConfiguration.getWriterPath() + fileName); + + // return the output stream + return new OutputStreamWriter(outputStream); + } + catch (Exception e) + { + LOGGER.error(e.getMessage()); + throw e; + } + } + + private synchronized void closeAndDestroyWriter() { + // if there is a current writer, we must close it first. + if (this.currentWriter != null) { + this.safeFlush(this.currentWriter); + this.closeSafely(this.currentWriter); + this.currentWriter = null; + + // + LOGGER.info("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size()-1)); + } + } + + private synchronized void closeSafely(Writer writer) { + if(writer != null) { + try { + writer.flush(); + writer.close(); + } + catch(Exception e) { + // noOp + } + LOGGER.debug("File Closed"); + } + } + + private void safeFlush(Flushable flushable) { + // This is wrapped with a ByteArrayOutputStream, so this is really safe. + if(flushable != null) { + try { + flushable.flush(); + } + catch(IOException e) { + // noOp + } + } + } + + + private String convertResultToString(StreamsDatum entry) + { + String metadata = null; + + try { + metadata = objectMapper.writeValueAsString(entry.getMetadata()); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + String documentJson = null; + try { + documentJson = objectMapper.writeValueAsString(entry.getDocument()); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + // Save the class name that it came from + entry.metadata.put("class", entry.getDocument().getClass().getName()); + + if(Strings.isNullOrEmpty(documentJson)) + return null; + else + return entry.getId() + DELIMITER + // [0] = Unique id of the verbatim + entry.getTimestamp() + DELIMITER + // [1] = Timestamp of the item + metadata + DELIMITER + // [2] = Metadata of the item + documentJson + "\n"; // [3] = The actual object + } + + public void prepare(Object configurationObject) { + // Connect to S3 + synchronized (this) { + + // if the user has chosen to not set the object mapper, then set a default object mapper for them. + if(this.objectMapper == null) + this.objectMapper = new ObjectMapper(); + + // Create the credentials Object + if(this.amazonS3Client == null) + { + AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey()); + + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toUpperCase())); + + // We want path style access + S3ClientOptions clientOptions = new S3ClientOptions(); + clientOptions.setPathStyleAccess(true); + + this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); + this.amazonS3Client.setS3ClientOptions(clientOptions); + } + } + } + + public void cleanUp() { + closeAndDestroyWriter(); + } + + public DatumStatusCounter getDatumStatusCounter() { + DatumStatusCounter counters = new DatumStatusCounter(); + counters.incrementAttempt(this.totalRecordsWritten.get()); + counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten.get()); + return counters; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java new file mode 100644 index 0000000..d791c87 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java @@ -0,0 +1,37 @@ +package org.apache.streams.s3; + +import org.apache.streams.core.StreamsDatum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; + +public class S3PersistWriterTask implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistWriterTask.class); + + private S3PersistWriter writer; + + public S3PersistWriterTask(S3PersistWriter writer) { + this.writer = writer; + } + + @Override + public void run() { + while(true) { + if( writer.persistQueue.peek() != null ) { + try { + StreamsDatum entry = writer.persistQueue.remove(); + writer.write(entry); + } catch (Exception e) { + e.printStackTrace(); + } + } + try { + Thread.sleep(new Random().nextInt(1)); + } catch (InterruptedException e) {} + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json new file mode 100644 index 0000000..863668f --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json @@ -0,0 +1,25 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.s3.S3Configuration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "key": { + "type": "string", + "description": "Your Amazon Key" + }, + "secretKey": { + "type": "string", + "description": "Your Amazon Secret Key" + }, + "bucket": { + "type": "string", + "description": "The AWS bucket you want to write to" + }, + "protocol": { + "type": "string", + "description": "Whether you are using HTTP or HTTPS" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json new file mode 100644 index 0000000..2959b3d --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json @@ -0,0 +1,14 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.s3.S3ReaderConfiguration", + "extends": {"$ref":"S3Configuration.json"}, + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "readerPath": { + "type": "string", + "description": "Path below root path" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/544a0c92/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json new file mode 100644 index 0000000..f43087b --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json @@ -0,0 +1,28 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.s3.S3WriterConfiguration", + "extends": {"$ref":"S3Configuration.json"}, + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "writerPath": { + "type": "string", + "description": "Path " + }, + "writerFilePrefix": { + "type": "string", + "description": "File Prefix" + }, + "maxFileSize": { + "type": "integer", + "default" : 20, + "description": "If files are elected to be 'chunked' which they are by default, this is the maximum size of that file before the byte array stream is vacated and the file is created." + }, + "chunk": { + "type": "boolean", + "default" : true, + "description": "Whether you want the file chunked inside of a folder or not" + } + } +} \ No newline at end of file