Repository: incubator-streams Updated Branches: refs/heads/master 42d0ab33f -> d88e8c889
Writer can now write to us-west-1 Simplified configurator Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/25af3ad3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/25af3ad3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/25af3ad3 Branch: refs/heads/master Commit: 25af3ad3dadf40d57eeb62479b3e73cd5170275e Parents: 1788691 Author: sblackmon <[email protected]> Authored: Tue Oct 7 13:44:22 2014 -0500 Committer: sblackmon <[email protected]> Committed: Tue Oct 7 13:44:22 2014 -0500 ---------------------------------------------------------------------- streams-contrib/streams-amazon-aws/pom.xml | 2 +- .../org/apache/streams/s3/S3Configurator.java | 55 +++++++++----------- .../org/apache/streams/s3/S3PersistReader.java | 11 ++-- .../org/apache/streams/s3/S3PersistWriter.java | 10 ++-- .../org/apache/streams/s3/S3Configuration.json | 18 +++++-- 5 files changed, 55 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/pom.xml b/streams-contrib/streams-amazon-aws/pom.xml index 57a67cb..c9b73ae 100644 --- a/streams-contrib/streams-amazon-aws/pom.xml +++ b/streams-contrib/streams-amazon-aws/pom.xml @@ -45,7 +45,7 @@ <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> - <version>1.7.5</version> + <version>1.8.11</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java index dfa0426..6bf1672 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java @@ -18,7 +18,9 @@ package org.apache.streams.s3; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,53 +32,46 @@ public class S3Configurator { public static S3Configuration detectConfiguration(Config s3) { - S3Configuration s3Configuration = new S3Configuration(); + S3Configuration s3Configuration = null; - s3Configuration.setBucket(s3.getString("bucket")); - s3Configuration.setKey(s3.getString("key")); - s3Configuration.setSecretKey(s3.getString("secretKey")); - - // The Amazon S3 Library defaults to HTTPS - String protocol = (!s3.hasPath("protocol") ? "https": s3.getString("protocol")).toLowerCase(); - - if(!(protocol.equals("https") || protocol.equals("http"))) { - // you must specify either HTTP or HTTPS - throw new RuntimeException("You must specify either HTTP or HTTPS as a protocol"); + try { + s3Configuration = mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), S3Configuration.class); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("Could not parse S3Configuration"); } - s3Configuration.setProtocol(protocol.toLowerCase()); - return s3Configuration; } public static S3ReaderConfiguration detectReaderConfiguration(Config s3) { - S3Configuration S3Configuration = detectConfiguration(s3); - S3ReaderConfiguration s3ReaderConfiguration = mapper.convertValue(S3Configuration, S3ReaderConfiguration.class); + S3ReaderConfiguration s3Configuration = null; - s3ReaderConfiguration.setReaderPath(s3.getString("readerPath")); + try { + s3Configuration = mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), S3ReaderConfiguration.class); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("Could not parse S3Configuration"); + } - return s3ReaderConfiguration; + return s3Configuration; } public static S3WriterConfiguration detectWriterConfiguration(Config s3) { - S3Configuration s3Configuration = detectConfiguration(s3); - S3WriterConfiguration s3WriterConfiguration = mapper.convertValue(s3Configuration, S3WriterConfiguration.class); + S3WriterConfiguration s3Configuration = null; - String rootPath = s3.getString("writerPath"); - - // if the root path doesn't end in a '/' then we need to force the '/' at the end of the path. - s3WriterConfiguration.setWriterPath(rootPath + (rootPath.endsWith("/") ? "" : "/")); - - s3WriterConfiguration.setWriterFilePrefix(s3.hasPath("writerFilePrefix") ? s3.getString("writerFilePrefix") : "default"); + try { + s3Configuration = mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), S3WriterConfiguration.class); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("Could not parse S3Configuration"); + } - if(s3.hasPath("maxFileSize")) - s3WriterConfiguration.setMaxFileSize((long)s3.getInt("maxFileSize")); - if(s3.hasPath("chunk")) - s3WriterConfiguration.setChunk(s3.getBoolean("chunk")); + Preconditions.checkArgument(s3Configuration.getWriterPath().endsWith("/"), s3Configuration.getWriterPath() + " must end with '/'"); - return s3WriterConfiguration; + return s3Configuration; } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java index 4f62a06..5709f22 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java @@ -21,12 +21,15 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.S3ClientOptions; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; import com.google.common.collect.Queues; import org.apache.streams.core.*; import org.joda.time.DateTime; @@ -104,13 +107,15 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey()); ClientConfiguration clientConfig = new ClientConfiguration(); - clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toUpperCase())); + clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toString())); - // We want path style access + // We do not want path style access S3ClientOptions clientOptions = new S3ClientOptions(); - clientOptions.setPathStyleAccess(true); + clientOptions.setPathStyleAccess(false); this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); + if( !Strings.isNullOrEmpty(s3ReaderConfiguration.getRegion())) + this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion()))); this.amazonS3Client.setS3ClientOptions(clientOptions); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java index 058f748..9111265 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java @@ -21,6 +21,8 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.S3ClientOptions; import com.fasterxml.jackson.core.JsonProcessingException; @@ -256,13 +258,15 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey()); ClientConfiguration clientConfig = new ClientConfiguration(); - clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toUpperCase())); + clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toString())); - // We want path style access + // We do not want path style access S3ClientOptions clientOptions = new S3ClientOptions(); - clientOptions.setPathStyleAccess(true); + clientOptions.setPathStyleAccess(false); this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); + if( !Strings.isNullOrEmpty(s3WriterConfiguration.getRegion())) + this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3WriterConfiguration.getRegion()))); this.amazonS3Client.setS3ClientOptions(clientOptions); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json index 863668f..36e89d0 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json @@ -7,19 +7,29 @@ "properties": { "key": { "type": "string", - "description": "Your Amazon Key" + "description": "Your Amazon Key", + "required": true }, "secretKey": { "type": "string", - "description": "Your Amazon Secret Key" + "description": "Your Amazon Secret Key", + "required": true }, "bucket": { "type": "string", - "description": "The AWS bucket you want to write to" + "description": "Your AWS bucket", + "required": true }, "protocol": { "type": "string", - "description": "Whether you are using HTTP or HTTPS" + "description": "Whether you are using HTTP or HTTPS", + "enum": ["HTTP", "HTTPS"], + "default": "HTTPS" + }, + "region": { + "type": "string", + "description": "The AWS region where your bucket resides", + "required": true } } } \ No newline at end of file
