fixes to get streams-examples working
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7e4e1095 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7e4e1095 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7e4e1095 Branch: refs/heads/master Commit: 7e4e1095f09321210d6ce3acb56f4041680f2d6d Parents: 7293594 Author: sblackmon <[email protected]> Authored: Thu May 8 13:41:28 2014 -0500 Committer: sblackmon <[email protected]> Committed: Thu May 8 13:41:28 2014 -0500 ---------------------------------------------------------------------- .../ElasticsearchPersistWriter.java | 80 ++--------------- .../streams-provider-twitter/pom.xml | 13 +++ .../provider/TwitterEventClassifier.java | 10 ++- .../twitter/provider/TwitterStreamProvider.java | 7 +- .../provider/TwitterTimelineProvider.java | 7 +- .../TwitterJsonActivitySerializer.java | 3 +- .../TwitterJsonDeleteActivitySerializer.java | 3 +- .../TwitterJsonRetweetActivitySerializer.java | 3 +- .../TwitterJsonTweetActivitySerializer.java | 7 +- ...erJsonUserstreameventActivitySerializer.java | 92 ++++++++++++++++++++ .../main/jsonschema/com/twitter/FriendList.json | 20 +++++ .../jsonschema/com/twitter/UserstreamEvent.json | 47 ++++++++++ .../jackson/StreamsDateTimeDeserializer.java | 3 +- .../jackson/StreamsDateTimeSerializer.java | 5 +- .../jackson/StreamsPeriodDeserializer.java | 3 +- .../jackson/StreamsPeriodSerializer.java | 3 +- 16 files changed, 211 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index 80d2775..e99c4ff 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -47,10 +47,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###"); private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l; private static final long WAITING_DOCS_LIMIT = 10000; - private static final int BYTES_IN_MB = 1024 * 1024; - private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB; - - private final List<String> affectedIndexes = new ArrayList<String>(); + private static final int BYTES_IN_MB = 1024*1024; + private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB; private ObjectMapper mapper = new StreamsJacksonMapper(); private ElasticsearchClientManager manager; @@ -62,8 +60,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab private long batchSize; private boolean veryLargeBulk; // by default this setting is set to false - private int totalRecordsWritten = 0; - + protected Thread task; protected volatile Queue<StreamsDatum> persistQueue; @@ -79,8 +76,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab private volatile long totalSizeInBytes = 0; private volatile long batchSizeInBytes = 0; private volatile int batchItemsSent = 0; - private volatile int totalByteCount = 0; - private volatile int byteCount = 0; + private volatile int totalByteCount = 0; + private volatile int byteCount = 0; public ElasticsearchPersistWriter() { Config config = StreamsConfigurator.config.getConfig("elasticsearch"); @@ -121,8 +118,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab return totalOk; } - private ObjectMapper mapper = new StreamsJacksonMapper(); - public int getTotalFailed() { return totalFailed; } @@ -155,13 +150,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab return (client != null); } - private ElasticsearchWriterConfiguration config; - - private static final int BYTES_IN_MB = 1024*1024; - private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB; - private volatile int totalByteCount = 0; - private volatile int byteCount = 0; - @Override public void write(StreamsDatum streamsDatum) { @@ -254,6 +242,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab @Override public void prepare(Object configurationObject) { mapper = StreamsJacksonMapper.getInstance(); + veryLargeBulk = this.config.getBulk(); + batchSize = this.config.getBatchSize(); start(); } @@ -395,18 +385,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab } } - private void trackItemAndBytesWritten(long sizeInBytes) - { - currentItems++; - batchItemsSent++; - batchSizeInBytes += sizeInBytes; - - // If our queue is larger than our flush threashold, then we should flush the queue. - if( (batchSizeInBytes > flushThresholdSizeInBytes) || - (currentItems >= batchSize) ) - flushInternal(); - } - private void checkAndCreateBulkRequest() { // Synchronize to ensure that we don't lose any records @@ -512,14 +490,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab return toReturn; } - @Override - public void prepare(Object configurationObject) { - mapper = StreamsJacksonMapper.getInstance(); - veryLargeBulk = this.config.getBulk(); - batchSize = this.config.getBatchSize(); - start(); - } - private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) { bulkRequest.execute().addListener(new ActionListener<BulkResponse>() { @Override @@ -571,40 +541,4 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab flushInternal(); } - private void checkAndCreateBulkRequest() { - // Synchronize to ensure that we don't lose any records - synchronized (this) { - if (bulkRequest == null) - bulkRequest = this.manager.getClient().prepareBulk(); - } - } - - private void checkIndexImplications(String indexName) { - - // check to see if we have seen this index before. - if (this.affectedIndexes.contains(indexName)) - return; - - // we haven't log this index. - this.affectedIndexes.add(indexName); - - // Check to see if we are in 'veryLargeBulk' mode - // if we aren't, exit early - if (!this.veryLargeBulk) - return; - - - // They are in 'very large bulk' mode we want to turn off refreshing the index. - - // Create a request then add the setting to tell it to stop refreshing the interval - UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName); - updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1)); - - // submit to ElasticSearch - this.manager.getClient() - .admin() - .indices() - .updateSettings(updateSettingsRequest) - .actionGet(); - } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml index 8a41ca5..c104810 100644 --- a/streams-contrib/streams-provider-twitter/pom.xml +++ b/streams-contrib/streams-provider-twitter/pom.xml @@ -49,6 +49,11 @@ <artifactId>streams-config</artifactId> </dependency> <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-util</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> @@ -70,6 +75,12 @@ <artifactId>twitter4j-core</artifactId> <version>[4.0,)</version> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -116,6 +127,8 @@ <sourcePath>src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json</sourcePath> <sourcePath>src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json</sourcePath> <sourcePath>src/main/jsonschema/com/twitter/Delete.json</sourcePath> + <sourcePath>src/main/jsonschema/com/twitter/UserstreamEvent.json</sourcePath> + <sourcePath>src/main/jsonschema/com/twitter/FriendList.json</sourcePath> <sourcePath>src/main/jsonschema/com/twitter/Retweet.json</sourcePath> <sourcePath>src/main/jsonschema/com/twitter/tweet.json</sourcePath> </sourcePaths> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java index b577e42..3a8bf6d 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java @@ -2,11 +2,8 @@ package org.apache.streams.twitter.provider; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; -import com.jayway.jsonassert.JsonAssert; import org.apache.commons.lang.StringUtils; -import org.apache.streams.twitter.pojo.Delete; -import org.apache.streams.twitter.pojo.Retweet; -import org.apache.streams.twitter.pojo.Tweet; +import org.apache.streams.twitter.pojo.*; import org.apache.streams.twitter.serializer.StreamsTwitterMapper; import java.io.IOException; @@ -47,6 +44,11 @@ public class TwitterEventClassifier { return Retweet.class; else if( objectNode.findValue("delete") != null ) return Delete.class; + else if( objectNode.findValue("friends") != null || + objectNode.findValue("friends_str") != null ) + return FriendList.class; + else if( objectNode.findValue("target_object") != null ) + return UserstreamEvent.class; else return Tweet.class; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java index ba88803..fb160ef 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java @@ -50,9 +50,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat this.config = config; } - protected BlockingQueue hosebirdQueue = new LinkedBlockingQueue<String>(1000); + protected BlockingQueue<String> hosebirdQueue; - protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(1000); + protected volatile Queue<StreamsDatum> providerQueue; protected Hosts hosebirdHosts; protected Authentication auth; @@ -194,6 +194,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat LOGGER.debug("host={}\tendpoint={}\taut={}", new Object[] {hosebirdHosts,endpoint,auth}); + hosebirdQueue = new LinkedBlockingQueue<String>(1000); + providerQueue = new LinkedBlockingQueue<StreamsDatum>(1000); + client = new ClientBuilder() .name("apache/streams/streams-contrib/streams-provider-twitter") .hosts(hosebirdHosts) http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index db1ec76..06ba7be 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -23,9 +23,7 @@ import twitter4j.json.DataObjectFactory; import java.io.Serializable; import java.math.BigInteger; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -260,7 +258,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { .setAsyncNumThreads(3) .setRestBaseURL(baseUrl) .setIncludeMyRetweetEnabled(Boolean.TRUE) - .setIncludeRTsEnabled(Boolean.TRUE) + // not sure where this method went... + //.setIncludeRTsEnabled(Boolean.TRUE) .setPrettyDebugEnabled(Boolean.TRUE); return new TwitterFactory(builder.build()).getInstance(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java index bfceae0..0ab3448 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java @@ -31,13 +31,14 @@ import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; import java.io.IOException; +import java.io.Serializable; import java.util.List; import java.util.Map; /** * Created by sblackmon on 3/26/14. */ -public class TwitterJsonActivitySerializer implements ActivitySerializer<String> +public class TwitterJsonActivitySerializer implements ActivitySerializer<String>, Serializable { public TwitterJsonActivitySerializer() { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java index 40be0f6..4a0f348 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java @@ -13,6 +13,7 @@ import org.apache.streams.pojo.json.Actor; import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Tweet; +import java.io.Serializable; import java.util.List; import static org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer.*; @@ -24,7 +25,7 @@ import static org.apache.streams.twitter.serializer.TwitterJsonActivitySerialize * Time: 9:24 AM * To change this template use File | Settings | File Templates. */ -public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String> { +public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String>, Serializable { @Override public String serializationFormat() { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java index 51e39b1..f8ade3f 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java @@ -17,6 +17,7 @@ import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.pojo.User; import java.io.IOException; +import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,7 +32,7 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; * Time: 9:24 AM * To change this template use File | Settings | File Templates. */ -public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<String> { +public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<String>, Serializable { public TwitterJsonRetweetActivitySerializer() { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java index b141482..1d00b7a 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java @@ -18,6 +18,7 @@ import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.pojo.User; import java.io.IOException; +import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,11 +33,7 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; * Time: 9:24 AM * To change this template use File | Settings | File Templates. */ -public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String> { - - public TwitterJsonTweetActivitySerializer() { - - } +public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String>, Serializable { @Override public String serializationFormat() { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java new file mode 100644 index 0000000..c0768e3 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java @@ -0,0 +1,92 @@ +package org.apache.streams.twitter.serializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Strings; +import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; +import org.apache.streams.pojo.json.Actor; +import org.apache.streams.twitter.pojo.Delete; +import org.apache.streams.twitter.pojo.Tweet; +import org.apache.streams.twitter.pojo.UserstreamEvent; + +import java.util.List; + +import static org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer.*; + +/** +* Created with IntelliJ IDEA. +* User: mdelaet +* Date: 9/30/13 +* Time: 9:24 AM +* To change this template use File | Settings | File Templates. +*/ +public class TwitterJsonUserstreameventActivitySerializer implements ActivitySerializer<String> { + + @Override + public String serializationFormat() { + return null; + } + + @Override + public String serialize(Activity deserialized) throws ActivitySerializerException { + throw new NotImplementedException(); + } + + @Override + public Activity deserialize(String serialized) throws ActivitySerializerException { + return null; + } + + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + return null; + } + + public Activity convert(ObjectNode item) throws ActivitySerializerException { + + ObjectMapper mapper = StreamsTwitterMapper.getInstance(); + UserstreamEvent event = null; + try { + event = mapper.treeToValue(item, UserstreamEvent.class); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + Activity activity = new Activity(); + activity.setActor(buildActor(event)); + activity.setVerb(detectVerb(event)); + activity.setObject(buildActivityObject(event)); + activity.setId(TwitterJsonActivitySerializer.formatId(activity.getVerb())); + if(Strings.isNullOrEmpty(activity.getId())) + throw new ActivitySerializerException("Unable to determine activity id"); + activity.setProvider(getProvider()); + return activity; + } + + public Actor buildActor(UserstreamEvent event) { + Actor actor = new Actor(); + //actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr())); + return actor; + } + + public ActivityObject buildActivityObject(UserstreamEvent event) { + ActivityObject actObj = new ActivityObject(); + //actObj.setId(formatId(delete.getDelete().getStatus().getIdStr())); + //actObj.setObjectType("tweet"); + return actObj; + } + + public String detectVerb(UserstreamEvent event) { + return null; + } + + public ActivityObject buildTarget(UserstreamEvent event) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/FriendList.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/FriendList.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/FriendList.json new file mode 100644 index 0000000..5dd7687 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/FriendList.json @@ -0,0 +1,20 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.twitter.pojo.FriendList", + "properties": { + "friends": { + "type": "array", + "items": { + "type": "integer" + } + }, + "friends_str": { + "type": "array", + "items": { + "type": "string" + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json new file mode 100644 index 0000000..07b5883 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json @@ -0,0 +1,47 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.twitter.pojo.UserstreamEvent", + "properties": { + "created_at": { + "type": "string", + "format" : "date-time" + }, + "event_type": { + "type": "string", + "enum" : [ + "access_revoked", + "block", + "unblock", + "favorite", + "unfavorite", + "follow", + "unfollow", + "list_created", + "list_destroyed", + "list_updated", + "list_member_added", + "list_member_removed", + "list_user_subscribed", + "list_user_unsubscribed", + "user_update" + ] + }, + "source": { + "type": "string", + "items": { + "type": "integer" + } + }, + "target": { + "type": "string", + "items": { + "type": "integer" + } + }, + "target_object": { + "type": "object" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java index 3286e74..28f297c 100644 --- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java +++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java @@ -7,11 +7,12 @@ import org.apache.streams.data.util.RFC3339Utils; import org.joda.time.DateTime; import java.io.IOException; +import java.io.Serializable; /** * Created by sblackmon on 3/27/14. */ -public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> { +public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> implements Serializable { protected StreamsDateTimeDeserializer(Class<DateTime> dateTimeClass) { super(dateTimeClass); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java index 26bc157..89f9100 100644 --- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java +++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java @@ -11,11 +11,14 @@ import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; import java.io.IOException; +import java.io.Serializable; /** * Created by sblackmon on 3/27/14. */ -public class StreamsDateTimeSerializer extends StdSerializer<DateTime> { +public class StreamsDateTimeSerializer extends StdSerializer<DateTime> implements Serializable { + + protected StreamsDateTimeSerializer(Class<DateTime> dateTimeClass) { super(dateTimeClass); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java index ff765f6..9402878 100644 --- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java +++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java @@ -7,8 +7,9 @@ import org.apache.streams.data.util.RFC3339Utils; import org.joda.time.Period; import java.io.IOException; +import java.io.Serializable; -public class StreamsPeriodDeserializer extends StdDeserializer<Period> +public class StreamsPeriodDeserializer extends StdDeserializer<Period> implements Serializable { protected StreamsPeriodDeserializer(Class<Period> dateTimeClass) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java index 614cbdd..ef28c8e 100644 --- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java +++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java @@ -8,8 +8,9 @@ import org.joda.time.DateTime; import org.joda.time.Period; import java.io.IOException; +import java.io.Serializable; -public class StreamsPeriodSerializer extends StdSerializer<Period> +public class StreamsPeriodSerializer extends StdSerializer<Period> implements Serializable { protected StreamsPeriodSerializer(Class<Period> dateTimeClass) { super(dateTimeClass);
