resolves STREAMS-377
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1b2891d4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1b2891d4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1b2891d4 Branch: refs/heads/STREAMS-389 Commit: 1b2891d43f428c8719e470554e8af8b8f674c7fe Parents: c2229a4 Author: Steve Blackmon (@steveblackmon) <sblack...@apache.org> Authored: Sun Dec 13 14:24:06 2015 -0600 Committer: Steve Blackmon (@steveblackmon) <sblack...@apache.org> Committed: Sun Dec 13 14:24:06 2015 -0600 ---------------------------------------------------------------------- .../streams/converter/LineReadWriteUtil.java | 66 +++++++------------- .../converter/LineReadWriteConfiguration.json | 36 +++++++++++ .../converter/test/TestLineReadWriteUtil.java | 58 ++++++++--------- 3 files changed, 87 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1b2891d4/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java index d418b52..a38568b 100644 --- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java @@ -24,13 +24,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.streams.core.StreamsDatum; import org.apache.streams.jackson.StreamsJacksonMapper; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.math.BigInteger; import java.util.Iterator; import java.util.List; @@ -44,69 +44,45 @@ public class LineReadWriteUtil { private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class); - private static LineReadWriteUtil INSTANCE; + private static Map<LineReadWriteConfiguration, LineReadWriteUtil> INSTANCE_MAP = Maps.newConcurrentMap(); private final static List<String> DEFAULT_FIELDS = Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC"); private List<String> fields; private String fieldDelimiter = "\t"; private String lineDelimiter = "\n"; + private String encoding = "UTF-8"; private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); private LineReadWriteUtil() { - this(LineReadWriteUtil.DEFAULT_FIELDS); } - private LineReadWriteUtil(List<String> fields) { - if( fields != null && fields.size() > 0) this.fields = fields; - else this.fields = LineReadWriteUtil.DEFAULT_FIELDS; + private LineReadWriteUtil(LineReadWriteConfiguration configuration) { + this.fields = configuration.getFields(); + this.fieldDelimiter = configuration.getFieldDelimiter(); + this.lineDelimiter = configuration.getLineDelimiter(); + this.encoding = configuration.getEncoding(); } - private LineReadWriteUtil(List<String> fields, String fieldDelimiter) { - this(fields); - if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter; + public static LineReadWriteUtil getInstance() { + return getInstance(new LineReadWriteConfiguration()); } - private LineReadWriteUtil(List<String> fields, String fieldDelimiter, String lineDelimiter) { - this(fields); - if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter; - if( lineDelimiter != null ) this.lineDelimiter = lineDelimiter; - } - - public static LineReadWriteUtil getInstance(){ - if( INSTANCE == null ) - INSTANCE = new LineReadWriteUtil(LineReadWriteUtil.DEFAULT_FIELDS); - return INSTANCE; - } - - public static LineReadWriteUtil getInstance(List<String> fields){ - if( INSTANCE == null ) - INSTANCE = new LineReadWriteUtil(fields); - else if( !INSTANCE.fields.equals(fields)) - return new LineReadWriteUtil(fields); - return INSTANCE; - } - - public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter){ - if( INSTANCE == null ) - INSTANCE = new LineReadWriteUtil(fields, fieldDelimiter); - else if( !INSTANCE.fields.equals(fields) || !INSTANCE.fieldDelimiter.equals(fieldDelimiter)) - return new LineReadWriteUtil(fields, fieldDelimiter); - return INSTANCE; - } - - public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter, String lineDelimiter){ - if( INSTANCE == null ) - INSTANCE = new LineReadWriteUtil(fields, fieldDelimiter, lineDelimiter); - else if( !INSTANCE.fields.equals(fields) || !INSTANCE.fieldDelimiter.equals(fieldDelimiter) || !INSTANCE.fieldDelimiter.equals(lineDelimiter)) - return new LineReadWriteUtil(fields, fieldDelimiter, lineDelimiter); - return INSTANCE; + public static LineReadWriteUtil getInstance(LineReadWriteConfiguration configuration) { + if( INSTANCE_MAP.containsKey(configuration) && + INSTANCE_MAP.get(configuration) != null) + return INSTANCE_MAP.get(configuration); + else { + INSTANCE_MAP.put(configuration, new LineReadWriteUtil(configuration)); + return INSTANCE_MAP.get(configuration); + } } public StreamsDatum processLine(String line) { List<String> expectedFields = fields; + if( line.endsWith(lineDelimiter)) line = trimLineDelimiter(line); String[] parsedFields = line.split(fieldDelimiter); if( parsedFields.length == 0) @@ -143,7 +119,7 @@ public class LineReadWriteUtil { metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]); } - StreamsDatum datum = new StreamsDatum(trimLineDelimiter(json)); + StreamsDatum datum = new StreamsDatum(json); datum.setId(id); datum.setTimestamp(ts); datum.setMetadata(metadata); @@ -203,7 +179,7 @@ public class LineReadWriteUtil { } joiner.appendTo(stringBuilder, fielddata); - return stringBuilder.append(lineDelimiter).toString(); + return stringBuilder.toString(); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1b2891d4/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json b/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json new file mode 100644 index 0000000..13e8428 --- /dev/null +++ b/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json @@ -0,0 +1,36 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "id": "#", + "type": "object", + "javaType" : "org.apache.streams.converter.LineReadWriteConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "fields": { + "type": "array", + "items": { + "type": "string" + }, + "default": [ + "ID", + "TS", + "META", + "DOC" + ] + }, + "field_delimiter": { + "type": "string", + "default": "\t" + }, + "line_delimiter": { + "type": "string", + "default": "\n" + }, + "encoding": { + "type": "string", + "default": "UTF-8" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1b2891d4/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java index ed61203..955eef7 100644 --- a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java +++ b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java @@ -22,24 +22,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import junit.framework.Assert; -import org.apache.commons.io.Charsets; -import org.apache.commons.io.IOUtils; +import org.apache.streams.converter.LineReadWriteConfiguration; import org.apache.streams.converter.LineReadWriteUtil; import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsResultSet; import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; import org.joda.time.DateTime; -import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.InputStream; import java.math.BigInteger; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; @@ -60,31 +52,41 @@ public class TestLineReadWriteUtil { @Test public void TestLineReadWrite () throws Exception { - List<List<String>> fieldArrays = Lists.newArrayList(); - fieldArrays.add(Lists.newArrayList("ID")); - fieldArrays.add(Lists.newArrayList("DOC")); - fieldArrays.add(Lists.newArrayList("ID", "DOC")); - fieldArrays.add(Lists.newArrayList("ID", "TS", "DOC")); - fieldArrays.add(Lists.newArrayList("ID", "TS", "META", "DOC")); - fieldArrays.add(Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC")); - - TestLineReadWriteCase(fieldArrays.get(0), null, null); - TestLineReadWriteCase(fieldArrays.get(1), "\t", null ); - TestLineReadWriteCase(fieldArrays.get(2), "\t", "\n" ); - TestLineReadWriteCase(fieldArrays.get(3), null, "\n" ); - TestLineReadWriteCase(fieldArrays.get(4), "|", "\n" ); - TestLineReadWriteCase(fieldArrays.get(5), "|", "\\0" ); + List<LineReadWriteConfiguration> configs = Lists.newArrayList(); + configs.add(new LineReadWriteConfiguration()); + configs.add(new LineReadWriteConfiguration() + .withFields(Lists.newArrayList("ID"))); + configs.add(new LineReadWriteConfiguration() + .withFields(Lists.newArrayList("DOC")) + .withFieldDelimiter("\t")); + configs.add(new LineReadWriteConfiguration() + .withFields(Lists.newArrayList("ID", "DOC")) + .withFieldDelimiter("\t") + .withLineDelimiter("\n")); + configs.add(new LineReadWriteConfiguration() + .withFields(Lists.newArrayList("ID", "TS", "DOC")) + .withLineDelimiter("\n")); + configs.add(new LineReadWriteConfiguration() + .withFields(Lists.newArrayList("ID", "TS", "META", "DOC")) + .withFieldDelimiter("|") + .withLineDelimiter("\n")); + configs.add(new LineReadWriteConfiguration() + .withFields(Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC")) + .withFieldDelimiter("|") + .withLineDelimiter("\\0")); + + for(LineReadWriteConfiguration config : configs) + TestLineReadWriteCase(config); } - public void TestLineReadWriteCase(List<String> fields, String fieldDelimiter, String lineDelimiter) throws Exception { + public void TestLineReadWriteCase(LineReadWriteConfiguration lineReadWriteConfiguration) throws Exception { LineReadWriteUtil lineReadWriteUtil; - if( lineDelimiter != null && fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter, lineDelimiter); - else if( lineDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, null, lineDelimiter); - else if( fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter); - else lineReadWriteUtil = LineReadWriteUtil.getInstance(fields); + lineReadWriteUtil = LineReadWriteUtil.getInstance(lineReadWriteConfiguration); + + assert(lineReadWriteUtil != null); StreamsDatum testDatum = randomDatum(); String writeResult = lineReadWriteUtil.convertResultToString(testDatum); assert !Strings.isNullOrEmpty(writeResult);