Changes
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d8feb5ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d8feb5ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d8feb5ba Branch: refs/heads/master Commit: d8feb5ba06afae4716d3122d79796e2347679640 Parents: 5ff406f Author: rebanks <[email protected]> Authored: Wed Apr 2 12:01:10 2014 -0500 Committer: rebanks <[email protected]> Committed: Wed Apr 2 12:01:10 2014 -0500 ---------------------------------------------------------------------- .../elasticsearch/ElasticsearchPersistWriter.java | 3 ++- .../streams/twitter/test/SimpleTweetTest.java | 18 +++++++++++------- .../streams/local/builders/StreamComponent.java | 16 ++++++++++++---- .../streams/local/tasks/BaseStreamsTask.java | 3 ++- 4 files changed, 27 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d8feb5ba/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index 9390219..0e16c2c 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -7,6 +7,7 @@ import com.google.common.base.Preconditions; import com.typesafe.config.Config; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.*; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -97,7 +98,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab protected volatile Queue<StreamsDatum> persistQueue; - private ObjectMapper mapper = new ObjectMapper(); + private ObjectMapper mapper = new StreamsJacksonMapper(); private ElasticsearchWriterConfiguration config; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d8feb5ba/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java index 8988de0..b8bfe1a 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java @@ -3,14 +3,11 @@ package org.apache.streams.twitter.test; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Optional; -import org.apache.commons.lang.StringUtils; +import org.apache.streams.core.StreamsDatum; import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.json.Activity; -import org.apache.streams.twitter.pojo.Delete; -import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; -import org.apache.streams.twitter.provider.TwitterEventClassifier; +import org.apache.streams.twitter.processor.TwitterTypeConverter; import org.apache.streams.twitter.serializer.StreamsTwitterMapper; import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer; import org.junit.Assert; @@ -24,8 +21,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import static org.hamcrest.CoreMatchers.*; -import static org.hamcrest.Matchers.greaterThan; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** @@ -82,6 +77,15 @@ public class SimpleTweetTest { Assert.fail(); } + try { + TwitterTypeConverter converter = new TwitterTypeConverter(String.class, Activity.class); + converter.prepare(null); + converter.process(new StreamsDatum(TWITTER_JSON)); + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail(); + } + assertThat(activity, is(not(nullValue()))); assertThat(activity.getId(), is(not(nullValue()))); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d8feb5ba/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java index 6319ba8..f5e9978 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java @@ -165,10 +165,18 @@ public class StreamComponent { public StreamsTask createConnectedTask() { StreamsTask task; if(this.processor != null) { - task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor)); - task.addInputQueue(this.inQueue); - for(Queue<StreamsDatum> q : this.outBound.values()) { - task.addOutputQueue(q); + if(this.numTasks > 1) { + task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor)); + task.addInputQueue(this.inQueue); + for(Queue<StreamsDatum> q : this.outBound.values()) { + task.addOutputQueue(q); + } + } else { + task = new StreamsProcessorTask(this.processor); + task.addInputQueue(this.inQueue); + for(Queue<StreamsDatum> q : this.outBound.values()) { + task.addOutputQueue(q); + } } } else if(this.writer != null) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d8feb5ba/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java index 3799480..694cb76 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java @@ -3,6 +3,7 @@ package org.apache.streams.local.tasks; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.streams.core.StreamsDatum; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.util.SerializationUtil; import org.slf4j.Logger; @@ -27,7 +28,7 @@ public abstract class BaseStreamsTask implements StreamsTask { private ObjectMapper mapper; public BaseStreamsTask() { - this.mapper = new ObjectMapper(); + this.mapper = new StreamsJacksonMapper(); this.mapper.registerSubtypes(Activity.class); }
