Tweaks to Twitter & ES to restore examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b8fef9d1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b8fef9d1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b8fef9d1 Branch: refs/heads/master Commit: b8fef9d1b04734e959cf2aa01d5314f6aa0e844d Parents: da2d80c Author: sblackmon <[email protected]> Authored: Wed Apr 2 16:32:26 2014 -0500 Committer: sblackmon <[email protected]> Committed: Wed Apr 2 16:32:26 2014 -0500 ---------------------------------------------------------------------- .../ElasticsearchConfigurator.java | 31 ++++++++++++++++ .../ElasticsearchPersistReader.java | 4 +- .../ElasticsearchPersistWriter.java | 2 + .../ElasticsearchReaderConfiguration.json | 18 ++++++--- .../processor/TwitterEventProcessor.java | 39 ++++++-------------- .../twitter/processor/TwitterTypeConverter.java | 7 ++-- .../provider/TwitterStreamConfigurator.java | 27 ++++++++++---- .../twitter/provider/TwitterStreamProvider.java | 19 ++++++++-- .../provider/TwitterTimelineProvider.java | 1 + .../streams/jackson/StreamsJacksonMapper.java | 5 ++- .../streams/jackson/StreamsJacksonModule.java | 5 +++ .../streams/local/tasks/BaseStreamsTask.java | 24 ++++++++---- .../org/apache/streams/util/ComponentUtils.java | 18 +++++++++ 13 files changed, 143 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java index 224f7da..20b5c08 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java @@ -1,5 +1,6 @@ package org.apache.streams.elasticsearch; +import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,6 +14,8 @@ public class ElasticsearchConfigurator { private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfigurator.class); + private final static ObjectMapper mapper = new ObjectMapper(); + public static ElasticsearchConfiguration detectConfiguration(Config elasticsearch) { List<String> hosts = elasticsearch.getStringList("hosts"); Long port = elasticsearch.getLong("port"); @@ -27,4 +30,32 @@ public class ElasticsearchConfigurator { return elasticsearchConfiguration; } + public static ElasticsearchReaderConfiguration detectReaderConfiguration(Config elasticsearch) { + + ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch); + ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchReaderConfiguration.class); + + List<String> indexes = elasticsearch.getStringList("indexes"); + List<String> types = elasticsearch.getStringList("types"); + + elasticsearchReaderConfiguration.setIndexes(indexes); + elasticsearchReaderConfiguration.setTypes(types); + + return elasticsearchReaderConfiguration; + } + + public static ElasticsearchWriterConfiguration detectWriterConfiguration(Config elasticsearch) { + + ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch); + ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchWriterConfiguration.class); + + String index = elasticsearch.getString("index"); + String type = elasticsearch.getString("type"); + + elasticsearchWriterConfiguration.setIndex(index); + elasticsearchWriterConfiguration.setType(type); + + return elasticsearchWriterConfiguration; + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java index 8ffcbd5..cc9b3fc 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java @@ -105,8 +105,8 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl } public ElasticsearchPersistReader(ElasticsearchReaderConfiguration elasticsearchConfiguration) { this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration); - indexes.add(elasticsearchConfiguration.getIndex()); - types.add(elasticsearchConfiguration.getType()); + indexes.addAll(elasticsearchConfiguration.getIndexes()); + types.addAll(elasticsearchConfiguration.getTypes()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index 9390219..ab35edd 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -61,6 +61,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab private volatile int totalSent = 0; private volatile int totalSeconds = 0; + private volatile int totalAttempted = 0; private volatile int totalOk = 0; private volatile int totalFailed = 0; private volatile int totalBatchCount = 0; @@ -310,6 +311,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab thisOk++; } + totalAttempted += thisSent; totalOk += thisOk; totalFailed += thisFailed; totalSeconds += (thisMillis / 1000); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json index 698da1a..1f1c720 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json +++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json @@ -6,13 +6,19 @@ "extends": {"$ref":"ElasticsearchConfiguration.json"}, "javaInterfaces": ["java.io.Serializable"], "properties": { - "index": { - "type": "string", - "description": "Index to write to" + "indexes": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Indexes to read from" }, - "type": { - "type": "string", - "description": "Type to write as" + "types": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Types to read from" } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java index abc0c1a..270172f 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java @@ -16,6 +16,7 @@ import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.provider.TwitterEventClassifier; import org.apache.streams.twitter.serializer.*; +import org.apache.streams.util.ComponentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,9 +42,7 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable { private Class inClass; private Class outClass; - private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer(); - private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer(); - private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer(); + private TwitterJsonActivitySerializer twitterJsonActivitySerializer; public final static String TERMINATE = new String("TERMINATE"); @@ -65,22 +64,20 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable { while(true) { String item; try { - item = inQueue.take(); + + item = ComponentUtils.pollUntilStringNotEmpty(inQueue); + if(item instanceof String && item.equals(TERMINATE)) { LOGGER.info("Terminating!"); break; } - System.out.println(item); + ObjectNode objectNode = (ObjectNode) mapper.readTree(item); - if( StringUtils.isNotEmpty(item) ) { - ObjectNode objectNode = (ObjectNode) mapper.readTree(item); + StreamsDatum rawDatum = new StreamsDatum(objectNode); - StreamsDatum rawDatum = new StreamsDatum(objectNode); - - for (StreamsDatum entry : process(rawDatum)) { - outQueue.offer(entry); - } + for (StreamsDatum entry : process(rawDatum)) { + ComponentUtils.offerUntilSuccess(entry, outQueue); } } catch (Exception e) { @@ -95,21 +92,9 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable { Object result = null; if( outClass.equals( Activity.class )) { - if( inClass.equals( Delete.class )) { - LOGGER.debug("ACTIVITY DELETE"); - result = twitterJsonDeleteActivitySerializer.deserialize( - mapper.writeValueAsString(event)); - } else if ( inClass.equals( Retweet.class )) { - LOGGER.debug("ACTIVITY RETWEET"); - result = twitterJsonRetweetActivitySerializer.deserialize( + LOGGER.debug("ACTIVITY"); + result = twitterJsonActivitySerializer.deserialize( mapper.writeValueAsString(event)); - } else if ( inClass.equals( Tweet.class )) { - LOGGER.debug("ACTIVITY TWEET"); - result = twitterJsonTweetActivitySerializer.deserialize( - mapper.writeValueAsString(event)); - } else { - return null; - } } else if( outClass.equals( Tweet.class )) { if ( inClass.equals( Tweet.class )) { LOGGER.debug("TWEET"); @@ -210,7 +195,7 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable { @Override public void prepare(Object configurationObject) { - + twitterJsonActivitySerializer = new TwitterJsonActivitySerializer(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java index 1c1e2fb..68820c9 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java @@ -31,7 +31,7 @@ public class TwitterTypeConverter implements StreamsProcessor { private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class); - private ObjectMapper mapper = new StreamsTwitterMapper(); + private ObjectMapper mapper; private Queue<StreamsDatum> inQueue; private Queue<StreamsDatum> outQueue; @@ -39,7 +39,7 @@ public class TwitterTypeConverter implements StreamsProcessor { private Class inClass; private Class outClass; - private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer(); + private TwitterJsonActivitySerializer twitterJsonActivitySerializer; public final static String TERMINATE = new String("TERMINATE"); @@ -176,7 +176,8 @@ public class TwitterTypeConverter implements StreamsProcessor { @Override public void prepare(Object o) { - + mapper = new StreamsTwitterMapper(); + twitterJsonActivitySerializer = new TwitterJsonActivitySerializer(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java index 2ae8d59..b1a1a07 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java @@ -4,6 +4,7 @@ import com.google.common.collect.Lists; import com.typesafe.config.Config; import com.typesafe.config.ConfigException; import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.twitter.TwitterBasicAuthConfiguration; import org.apache.streams.twitter.TwitterOAuthConfiguration; import org.apache.streams.twitter.TwitterStreamConfiguration; import org.slf4j.Logger; @@ -19,23 +20,35 @@ public class TwitterStreamConfigurator { private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamConfigurator.class); public static TwitterStreamConfiguration detectConfiguration(Config twitter) { - Config oauth = StreamsConfigurator.config.getConfig("twitter.oauth"); TwitterStreamConfiguration twitterStreamConfiguration = new TwitterStreamConfiguration(); twitterStreamConfiguration.setProtocol(twitter.getString("protocol")); twitterStreamConfiguration.setHost(twitter.getString("host")); twitterStreamConfiguration.setPort(twitter.getLong("port")); twitterStreamConfiguration.setVersion(twitter.getString("version")); - TwitterOAuthConfiguration twitterOAuthConfiguration = new TwitterOAuthConfiguration(); - twitterOAuthConfiguration.setConsumerKey(oauth.getString("consumerKey")); - twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret")); - twitterOAuthConfiguration.setAccessToken(oauth.getString("accessToken")); - twitterOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret")); - twitterStreamConfiguration.setOauth(twitterOAuthConfiguration); + + try { + Config basicauth = StreamsConfigurator.config.getConfig("twitter.basicauth"); + TwitterBasicAuthConfiguration twitterBasicAuthConfiguration = new TwitterBasicAuthConfiguration(); + twitterBasicAuthConfiguration.setUsername(basicauth.getString("username")); + twitterBasicAuthConfiguration.setPassword(basicauth.getString("password")); + twitterStreamConfiguration.setBasicauth(twitterBasicAuthConfiguration); + } catch( ConfigException ce ) {} + + try { + Config oauth = StreamsConfigurator.config.getConfig("twitter.oauth"); + TwitterOAuthConfiguration twitterOAuthConfiguration = new TwitterOAuthConfiguration(); + twitterOAuthConfiguration.setConsumerKey(oauth.getString("consumerKey")); + twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret")); + twitterOAuthConfiguration.setAccessToken(oauth.getString("accessToken")); + twitterOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret")); + twitterStreamConfiguration.setOauth(twitterOAuthConfiguration); + } catch( ConfigException ce ) {} try { twitterStreamConfiguration.setTrack(twitter.getStringList("track")); } catch( ConfigException ce ) {} + try { List<Long> follows = Lists.newArrayList(); for( Integer id : twitter.getIntList("follow")) http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java index 6a3def6..2b8a2f1 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java @@ -13,6 +13,7 @@ import com.twitter.hbc.core.Constants; import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint; import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; import com.twitter.hbc.core.endpoint.StreamingEndpoint; +import com.twitter.hbc.core.endpoint.UserstreamEndpoint; import com.twitter.hbc.core.processor.StringDelimitedProcessor; import com.twitter.hbc.httpclient.BasicClient; import com.twitter.hbc.httpclient.auth.Authentication; @@ -135,7 +136,18 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable { Preconditions.checkNotNull(this.klass); Preconditions.checkNotNull(config.getEndpoint()); - if(config.getEndpoint().endsWith("sample.json") ) { + + if(config.getEndpoint().endsWith("user.json") ) { + endpoint = new UserstreamEndpoint(); + + Optional<String> with = Optional.fromNullable(config.getWith()); + Optional<String> replies = Optional.fromNullable(config.getReplies()); + + if( with.isPresent() ) endpoint.addPostParameter("with", with.get()); + if( replies.isPresent() ) endpoint.addPostParameter("replies", replies.get()); + + } + else if(config.getEndpoint().endsWith("sample.json") ) { endpoint = new StatusesSampleEndpoint(); Optional<List<String>> track = Optional.fromNullable(config.getTrack()); @@ -143,6 +155,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable { if( track.isPresent() ) endpoint.addPostParameter("track", Joiner.on(",").join(track.get())); if( follow.isPresent() ) endpoint.addPostParameter("follow", Joiner.on(",").join(follow.get())); + } else if( config.getEndpoint().endsWith("firehose.json")) endpoint = new StatusesFirehoseEndpoint(); @@ -174,12 +187,10 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable { return; } - endpoint.addPostParameter("with", config.getWith()); - endpoint.addPostParameter("replies", config.getReplies()); client = new ClientBuilder() .name("apache/streams/streams-contrib/streams-provider-twitter") - .hosts(Constants.STREAM_HOST) + .hosts(config.getProtocol() + "://" + config.getHost()) .endpoint(endpoint) .authentication(auth) .processor(new StringDelimitedProcessor(inQueue)) http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index 40fb961..242d943 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -254,6 +254,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { Preconditions.checkNotNull(config.getFollow()); + Preconditions.checkArgument(config.getHost().equals("api.twitter.com")); Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline")); Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java index 275ed7e..7ef74ee 100644 --- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java +++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; import com.fasterxml.jackson.databind.module.SimpleModule; import org.joda.time.DateTime; @@ -37,7 +38,9 @@ public class StreamsJacksonMapper extends ObjectMapper { configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.FALSE); configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, Boolean.TRUE); - setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + // If a user has an 'object' that does not have an explicit mapping, don't cause the serialization to fail. + configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE); + setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.DEFAULT); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java index 323e40c..bcf0f65 100644 --- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java +++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java @@ -2,6 +2,7 @@ package org.apache.streams.jackson; import com.fasterxml.jackson.databind.module.SimpleModule; import org.joda.time.DateTime; +import org.joda.time.Period; /** * Created by sblackmon on 3/27/14. @@ -12,6 +13,10 @@ public class StreamsJacksonModule extends SimpleModule { super(); addSerializer(DateTime.class, new StreamsDateTimeSerializer(DateTime.class)); addDeserializer(DateTime.class, new StreamsDateTimeDeserializer(DateTime.class)); + + addSerializer(Period.class, new StreamsPeriodSerializer(Period.class)); + addDeserializer(Period.class, new StreamsPeriodDeserializer(Period.class)); } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java index 694cb76..8006560 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java @@ -10,10 +10,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; +import java.util.*; /** * @@ -116,13 +113,13 @@ public abstract class BaseStreamsTask implements StreamsTask { try { if(datum.document instanceof ObjectNode) { - return new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid); + return copyMetaData(datum, new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid)); } else if(datum.document instanceof Activity) { - return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class), + return copyMetaData(datum, new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class), datum.timestamp, - datum.sequenceid); + datum.sequenceid)); } // else if(this.mapper.canSerialize(datum.document.getClass())){ // return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), datum.document.getClass()), @@ -156,4 +153,17 @@ public abstract class BaseStreamsTask implements StreamsTask { } while( !success ); } + + private StreamsDatum copyMetaData(StreamsDatum copyFrom, StreamsDatum copyTo) { + Map<String, Object> fromMeta = copyFrom.getMetadata(); + Map<String, Object> toMeta = copyTo.getMetadata(); + for(String key : fromMeta.keySet()) { + Object value = fromMeta.get(key); + if(value instanceof Serializable) + toMeta.put(key, SerializationUtil.cloneBySerialization(value)); + else //hope for the best - should be serializable + toMeta.put(key, value); + } + return copyTo; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java index 5bf1d53..609d113 100644 --- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java +++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java @@ -1,5 +1,7 @@ package org.apache.streams.util; +import org.apache.commons.lang3.StringUtils; + import java.util.Queue; /** @@ -19,4 +21,20 @@ public class ComponentUtils { while( !success ); } + public static String pollUntilStringNotEmpty(Queue queue) { + + String result = null; + do { + synchronized( ComponentUtils.class ) { + try { + result = (String) queue.remove(); + } catch( Exception e ) {} + } + Thread.yield(); + } + while( result == null && !StringUtils.isNotEmpty(result) ); + + return result; + } + }
