refactor persist-pdfs and persist-s3 for STREAMS-377
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/19f69e5a Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/19f69e5a Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/19f69e5a Branch: refs/heads/master Commit: 19f69e5ae37aead22f033d720f8cfb84ad20bb91 Parents: 1b2891d Author: Steve Blackmon (@steveblackmon) <sblack...@apache.org> Authored: Sun Dec 13 23:27:05 2015 -0600 Committer: Steve Blackmon (@steveblackmon) <sblack...@apache.org> Committed: Sun Dec 13 23:27:05 2015 -0600 ---------------------------------------------------------------------- .../org/apache/streams/s3/S3PersistReader.java | 8 ++++-- .../org/apache/streams/s3/S3PersistWriter.java | 22 +++++++++++---- .../org/apache/streams/s3/S3Configuration.json | 23 ++-------------- .../streams/hdfs/WebHdfsPersistReader.java | 13 +++++---- .../streams/hdfs/WebHdfsPersistWriter.java | 15 +++++----- .../apache/streams/hdfs/HdfsConfiguration.json | 29 +++----------------- 6 files changed, 44 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java index b186288..702df71 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java @@ -32,7 +32,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.google.common.collect.Queues; import org.apache.streams.converter.LineReadWriteUtil; -import org.apache.streams.core.*; +import org.apache.streams.core.DatumStatusCountable; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistReader; +import org.apache.streams.core.StreamsResultSet; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,7 +112,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab public void prepare(Object configurationObject) { - lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration.getFields(), s3ReaderConfiguration.getFieldDelimiter(), s3ReaderConfiguration.getLineDelimiter()); + lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration); // Connect to S3 synchronized (this) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java index e426983..3686f55 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java @@ -25,21 +25,31 @@ import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.S3ClientOptions; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.converter.LineReadWriteUtil; -import org.apache.streams.core.*; -import org.apache.streams.hdfs.WebHdfsPersistWriter; +import org.apache.streams.core.DatumStatus; +import org.apache.streams.core.DatumStatusCountable; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.jackson.StreamsJacksonMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; -import java.util.*; +import java.io.Flushable; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -234,7 +244,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab public void prepare(Object configurationObject) { - lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration.getFields(), s3WriterConfiguration.getFieldDelimiter(), s3WriterConfiguration.getLineDelimiter()); + lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration); // Connect to S3 synchronized (this) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json index 7f2e9e5..403bfac 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json @@ -7,6 +7,9 @@ "type": "object", "javaType" : "org.apache.streams.s3.S3Configuration", "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "../../../../../../../../../../streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json" + }, "properties": { "key": { "type": "string", @@ -33,26 +36,6 @@ "type": "string", "description": "The AWS region where your bucket resides", "required": false - }, - "fields": { - "type": "array", - "items": { - "type": "string" - }, - "default": [ - "ID", - "TS", - "META", - "DOC" - ] - }, - "field_delimiter": { - "type": "string", - "default": "\t" - }, - "line_delimiter": { - "type": "string", - "default": "\n" } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java index ffdeb4c..24c9737 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java @@ -18,7 +18,6 @@ package org.apache.streams.hdfs; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -28,13 +27,16 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.converter.LineReadWriteUtil; -import org.apache.streams.core.*; +import org.apache.streams.core.DatumStatusCountable; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistReader; +import org.apache.streams.core.StreamsResultSet; import org.apache.streams.jackson.StreamsJacksonMapper; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -46,9 +48,8 @@ import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.Collections; -import java.util.Map; -import java.util.Queue; import java.util.List; +import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -170,7 +171,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo @Override public void prepare(Object configurationObject) { LOGGER.debug("Prepare"); - lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter()); + lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration); connectToWebHDFS(); String pathString = hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath(); LOGGER.info("Path : {}", pathString); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java index 6ace93b..492eccb 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java @@ -18,11 +18,8 @@ package org.apache.streams.hdfs; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Joiner; import com.google.common.base.Strings; -import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; @@ -31,9 +28,12 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.converter.LineReadWriteUtil; -import org.apache.streams.core.*; +import org.apache.streams.core.DatumStatus; +import org.apache.streams.core.DatumStatusCountable; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.jackson.StreamsJacksonMapper; -import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +46,6 @@ import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Date; -import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.zip.GZIPOutputStream; @@ -166,6 +165,8 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl String line = lineWriterUtil.convertResultToString(streamsDatum); writeInternal(line); + if( !line.endsWith(this.hdfsConfiguration.getLineDelimiter())) + writeInternal(this.hdfsConfiguration.getLineDelimiter()); int bytesInLine = line.getBytes().length; totalRecordsWritten++; @@ -268,7 +269,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl @Override public void prepare(Object configurationObject) { mapper = StreamsJacksonMapper.getInstance(); - lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter()); + lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration); connectToWebHDFS(); path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath()); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json index 61245c4..2853002 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json +++ b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json @@ -7,6 +7,9 @@ "type": "object", "javaType" : "org.apache.streams.hdfs.HdfsConfiguration", "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "../../../../../../../../../streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json" + }, "properties": { "scheme": { "type": "string", @@ -31,32 +34,8 @@ "description": "User" }, "password": { - "type": "string", - "description": "Password" - }, - "fields": { - "type": "array", - "items": { - "type": "string" - }, - "default": [ - "ID", - "TS", - "META", - "DOC" - ] - }, - "field_delimiter": { - "type": "string", - "default": "\t" - }, - "line_delimiter": { - "type": "string", - "default": "\n" - }, - "encoding": { "type": "string", - "default": "UTF-8" + "description": "Password" } } } \ No newline at end of file