added prototype pig runtime wrapper for serializers and processors added serialization exceptions storm version bump to match maven central
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7f543e5b Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7f543e5b Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7f543e5b Branch: refs/heads/master Commit: 7f543e5b93327cc2fa6f5dc7e901d18b88a22949 Parents: c2a858d Author: sblackmon <[email protected]> Authored: Tue Mar 25 18:46:12 2014 -0500 Committer: sblackmon <[email protected]> Committed: Tue Mar 25 18:46:12 2014 -0500 ---------------------------------------------------------------------- pom.xml | 4 +- .../processor/TwitterEventProcessor.java | 11 ++- .../twitter/processor/TwitterTypeConverter.java | 3 +- .../TwitterJsonDeleteActivitySerializer.java | 8 +- .../TwitterJsonEventActivitySerializer.java | 37 ++++----- .../TwitterJsonRetweetActivitySerializer.java | 12 ++- .../TwitterJsonTweetActivitySerializer.java | 53 ++++++++++-- .../apache/streams/data/ActivitySerializer.java | 5 +- .../exceptions/ActivitySerializerException.java | 27 +++++++ streams-runtimes/pom.xml | 1 + streams-runtimes/streams-runtime-pig/pom.xml | 84 ++++++++++++++++++++ .../streams/pig/StreamsComponentFactory.java | 58 ++++++++++++++ .../apache/streams/pig/StreamsPigBuilder.java | 54 +++++++++++++ .../streams/pig/StreamsProcessorExec.java | 77 ++++++++++++++++++ .../streams/pig/StreamsSerializerExec.java | 78 ++++++++++++++++++ .../org/apache/streams/pig/StreamsStorage.java | 10 +++ streams-runtimes/streams-runtime-storm/pom.xml | 4 +- 17 files changed, 488 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a476c07..185ee79 100644 --- a/pom.xml +++ b/pom.xml @@ -77,8 +77,8 @@ <guava.version>16.0.1</guava.version> <scala.version>2.8.0</scala.version> <clojure.version>1.4.0</clojure.version> - <storm.version>0.9.0.1</storm.version> - <kafka.version>0.8.0</kafka.version> + <storm.version>0.9.1-incubating</storm.version> + <kafka.version>0.8.1</kafka.version> <zookeeper.version>3.4.5-cdh4.5.0</zookeeper.version> <netty.version>3.8.0.Final</netty.version> <json-path.version>0.9.0</json-path.version> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java index c3707cb..e76d47c 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Retweet; @@ -85,7 +86,7 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable { } } - public Object convert(ObjectNode event, Class inClass, Class outClass) { + public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException { Object result = null; @@ -173,7 +174,13 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable { return Lists.newArrayList(new StreamsDatum(json)); else { // convert to desired format - Object out = convert(node, inClass, outClass); + Object out = null; + try { + out = convert(node, inClass, outClass); + } catch (ActivitySerializerException e) { + LOGGER.warn("Failed deserializing", e); + return Lists.newArrayList(); + } if( out != null && validate(out, outClass)) return Lists.newArrayList(new StreamsDatum(out)); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java index 73744c1..cd2d1ec 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Retweet; @@ -58,7 +59,7 @@ public class TwitterTypeConverter implements StreamsProcessor { inQueue = inputQueue; } - public Object convert(ObjectNode event, Class inClass, Class outClass) { + public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException { Object result = null; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java index 804f3b4..4e302f7 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java @@ -2,14 +2,14 @@ package org.apache.streams.twitter.serializer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Strings; +import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Actor; import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Tweet; -import java.io.Serializable; - /** * Created with IntelliJ IDEA. * User: mdelaet @@ -19,7 +19,7 @@ import java.io.Serializable; */ public class TwitterJsonDeleteActivitySerializer extends TwitterJsonEventActivitySerializer { - public Activity convert(ObjectNode event) { + public Activity convert(ObjectNode event) throws ActivitySerializerException { Delete delete = null; try { @@ -33,6 +33,8 @@ public class TwitterJsonDeleteActivitySerializer extends TwitterJsonEventActivit activity.setVerb("delete"); activity.setObject(buildActivityObject(delete)); activity.setId(formatId(activity.getVerb(), delete.getDelete().getStatus().getIdStr())); + if(Strings.isNullOrEmpty(activity.getId())) + throw new ActivitySerializerException("Unable to determine activity id"); activity.setProvider(buildProvider(event)); addTwitterExtension(activity, event); return activity; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java index 9d53527..004791c 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java @@ -9,16 +9,18 @@ import com.google.common.base.Joiner; import com.google.common.collect.Lists; import org.apache.commons.lang.NotImplementedException; import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.Generator; import org.apache.streams.pojo.json.Icon; import org.apache.streams.pojo.json.Provider; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; import java.io.IOException; import java.io.Serializable; -import java.text.DateFormat; import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.Map; @@ -32,9 +34,10 @@ import java.util.Map; */ public abstract class TwitterJsonEventActivitySerializer implements ActivitySerializer<String>, Serializable { - public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy"; + public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy"); + public static final DateTimeFormatter ACTIVITY_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy"); - ObjectMapper mapper = new ObjectMapper(); + public static final ObjectMapper mapper = new ObjectMapper(); @Override public String serializationFormat() { @@ -47,10 +50,10 @@ public abstract class TwitterJsonEventActivitySerializer implements ActivitySeri } @Override - public Activity deserialize(String serialized) { + public Activity deserialize(String serialized) throws ActivitySerializerException { serialized = serialized.replaceAll("\\[[ ]*\\]", "null"); -// System.out.println(serialized); + System.out.println(serialized); AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory()); mapper.setAnnotationIntrospector(introspector); @@ -63,8 +66,12 @@ public abstract class TwitterJsonEventActivitySerializer implements ActivitySeri try { ObjectNode event = (ObjectNode) mapper.readTree(serialized); + System.out.println(event.toString()); + Activity activity = convert(event); + System.out.println(activity.toString()); + return activity; } catch (IOException e) { @@ -73,25 +80,19 @@ public abstract class TwitterJsonEventActivitySerializer implements ActivitySeri } - public abstract Activity convert(ObjectNode event); + public abstract Activity convert(ObjectNode event) throws ActivitySerializerException; @Override public List<Activity> deserializeAll(List<String> serializedList) { throw new NotImplementedException("Not currently implemented"); } - public static Date parse(String str) { - Date date; + public static Date parse(String str) throws ParseException { + DateTime dateTime; String dstr; - DateFormat fmt = new SimpleDateFormat(DATE_FORMAT); - DateFormat out = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); - try { - date = fmt.parse(str); - dstr = out.format(date); - return out.parse(dstr); - } catch (ParseException e) { - throw new IllegalArgumentException("Invalid date format", e); - } + dateTime = TWITTER_FORMAT.parseDateTime(str); + dstr = ACTIVITY_FORMAT.print(dateTime); + return ACTIVITY_FORMAT.parseDateTime(dstr).toDate(); } public static Generator buildGenerator(ObjectNode event) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java index 69860a1..de96742 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java @@ -2,8 +2,10 @@ package org.apache.streams.twitter.serializer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Actor; @@ -27,7 +29,7 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; */ public class TwitterJsonRetweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer<String> { - public Activity convert(ObjectNode event) { + public Activity convert(ObjectNode event) throws ActivitySerializerException { Retweet retweet = null; try { @@ -41,7 +43,13 @@ public class TwitterJsonRetweetActivitySerializer extends TwitterJsonEventActivi activity.setVerb("share"); activity.setObject(buildActivityObject(retweet.getRetweetedStatus())); activity.setId(formatId(activity.getVerb(), retweet.getIdStr())); - activity.setPublished(parse(retweet.getCreatedAt())); + if(Strings.isNullOrEmpty(activity.getId())) + throw new ActivitySerializerException("Unable to determine activity id"); + try { + activity.setPublished(parse(retweet.getCreatedAt())); + } catch( Exception e ) { + throw new ActivitySerializerException("Unable to determine publishedDate", e); + } activity.setGenerator(buildGenerator(event)); activity.setIcon(getIcon(event)); activity.setProvider(buildProvider(event)); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java index 08727d8..25943b5 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java @@ -2,8 +2,11 @@ package org.apache.streams.twitter.serializer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Optional; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Actor; @@ -26,7 +29,11 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; */ public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer<String> { - public Activity convert(ObjectNode event) { + public TwitterJsonTweetActivitySerializer() { + + } + + public Activity convert(ObjectNode event) throws ActivitySerializerException { Tweet tweet = null; try { @@ -35,13 +42,28 @@ public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivity e.printStackTrace(); } + System.out.println("10"); + Activity activity = new Activity(); + + System.out.println("11"); + activity.setActor(buildActor(tweet)); activity.setVerb("post"); activity.setObject(buildActivityObject(tweet)); - activity.setId(formatId(activity.getVerb(), tweet.getIdStr())); + activity.setId(formatId(activity.getVerb(), + Optional.fromNullable( + tweet.getIdStr()) + .or(Optional.of(tweet.getId().toString())) + .orNull())); + if(Strings.isNullOrEmpty(activity.getId())) + throw new ActivitySerializerException("Unable to determine activity id"); + try { + activity.setPublished(parse(tweet.getCreatedAt())); + } catch( Exception e ) { + throw new ActivitySerializerException("Unable to determine publishedDate", e); + } activity.setTarget(buildTarget(tweet)); - activity.setPublished(parse(tweet.getCreatedAt())); activity.setGenerator(buildGenerator(event)); activity.setIcon(getIcon(event)); activity.setProvider(buildProvider(event)); @@ -49,6 +71,9 @@ public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivity activity.setContent(tweet.getText()); activity.setUrl(getUrls(event)); activity.setLinks(getLinks(tweet)); + + System.out.println("12"); + addTwitterExtension(activity, event); addLocationExtension(activity, tweet); return activity; @@ -57,9 +82,13 @@ public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivity public static Actor buildActor(Tweet tweet) { Actor actor = new Actor(); User user = tweet.getUser(); - actor.setId(formatId(user.getIdStr(), tweet.getIdStr())); + actor.setId(formatId( + Optional.fromNullable( + user.getIdStr()) + .or(Optional.of(user.getId().toString())) + .orNull() + )); actor.setDisplayName(user.getScreenName()); - actor.setId(user.getIdStr()); if (user.getUrl()!=null){ actor.setUrl(user.getUrl()); } @@ -68,8 +97,13 @@ public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivity public static ActivityObject buildActivityObject(Tweet tweet) { ActivityObject actObj = new ActivityObject(); - actObj.setId(formatId(tweet.getIdStr())); actObj.setObjectType("tweet"); + actObj.setId(formatId( + Optional.fromNullable( + tweet.getIdStr()) + .or(Optional.of(tweet.getId().toString())) + .orNull() + )); return actObj; } @@ -88,7 +122,12 @@ public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivity public static void addLocationExtension(Activity activity, Tweet tweet) { Map<String, Object> extensions = ensureExtensions(activity); Map<String, Object> location = new HashMap<String, Object>(); - location.put("id", formatId(tweet.getIdStr())); + location.put("id", formatId( + Optional.fromNullable( + tweet.getIdStr()) + .or(Optional.of(tweet.getId().toString())) + .orNull() + )); location.put("coordinates", tweet.getCoordinates()); extensions.put("location", location); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java index ae01fed..ad3809f 100644 --- a/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java +++ b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java @@ -18,6 +18,7 @@ package org.apache.streams.data; +import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.json.Activity; import java.util.List; @@ -40,14 +41,14 @@ public interface ActivitySerializer<T> { * @param deserialized the string * @return a fully populated Activity object */ - T serialize(Activity deserialized); + T serialize(Activity deserialized) throws ActivitySerializerException; /** * Converts a POJO into an Activity * @param serialized the string representation * @return a fully populated Activity object */ - Activity deserialize(T serialized); + Activity deserialize(T serialized) throws ActivitySerializerException; /** * Converts multiple documents into a list of Activity objects http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivitySerializerException.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivitySerializerException.java b/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivitySerializerException.java new file mode 100644 index 0000000..0e5459b --- /dev/null +++ b/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivitySerializerException.java @@ -0,0 +1,27 @@ +package org.apache.streams.exceptions; + +/** + * Created by sblackmon on 3/25/14. + */ +public class ActivitySerializerException extends Exception { + + public ActivitySerializerException() { + // TODO Auto-generated constructor stub + } + + public ActivitySerializerException(String message) { + super(message); + // TODO Auto-generated constructor stub + } + + public ActivitySerializerException(Throwable cause) { + super(cause); + // TODO Auto-generated constructor stub + } + + public ActivitySerializerException(String message, Throwable cause) { + super(message, cause); + // TODO Auto-generated constructor stub + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/pom.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/pom.xml b/streams-runtimes/pom.xml index 1abe74e..5d43c28 100644 --- a/streams-runtimes/pom.xml +++ b/streams-runtimes/pom.xml @@ -34,6 +34,7 @@ <modules> <module>streams-runtime-local</module> + <module>streams-runtime-pig</module> <module>streams-runtime-storm</module> <module>streams-runtime-webapp</module> </modules> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-pig/pom.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/pom.xml b/streams-runtimes/streams-runtime-pig/pom.xml new file mode 100644 index 0000000..7a3b3dc --- /dev/null +++ b/streams-runtimes/streams-runtime-pig/pom.xml @@ -0,0 +1,84 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>streams-project</artifactId> + <groupId>org.apache.streams</groupId> + <version>0.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>streams-runtime-pig</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + <version>0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + <version>0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-util</artifactId> + <version>0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-collections4</artifactId> + <version>4.0</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>2.0.0-cdh4.5.0.1-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.pig</groupId> + <artifactId>pig</artifactId> + <version>0.11.0-cdh4.5.0.1-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java new file mode 100644 index 0000000..7b9e375 --- /dev/null +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java @@ -0,0 +1,58 @@ +package org.apache.streams.pig; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.lang.ArrayUtils; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.data.ActivitySerializer; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Created by sblackmon on 3/25/14. + */ +public class StreamsComponentFactory { + + public static ActivitySerializer getSerializerInstance(Class<?> serializerClazz) { + + Object object = null; + try { + object = serializerClazz.getConstructor().newInstance(); + } catch (Exception e) { + e.printStackTrace(); + } + + Preconditions.checkNotNull(object); + + ActivitySerializer serializer = (ActivitySerializer) object; + + return serializer; + + } + + public static StreamsProcessor getProcessorInstance(Class<?> clazz, String... args) { + + Object object = null; + try { + int constructorStringCount = args.length; + List<Class> constructorSignature; + if( constructorStringCount == 0 ) + constructorSignature = Lists.newArrayList(); + else { + constructorSignature = Lists.newArrayListWithCapacity(args.length); + for (int i = 0; i < constructorStringCount; i++) + constructorSignature.add(String.class); + } + String[] constructorArgs = args; + object = clazz.getConstructor(constructorSignature.toArray(new Class[args.length])).newInstance(constructorArgs); + } catch (Exception e) { + e.printStackTrace(); + } + StreamsProcessor processor = (StreamsProcessor) object; + return processor; + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java new file mode 100644 index 0000000..5b11d7e --- /dev/null +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java @@ -0,0 +1,54 @@ +package org.apache.streams.pig; + +import org.apache.streams.core.StreamBuilder; +import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.core.StreamsProvider; +import org.joda.time.DateTime; + +import java.math.BigInteger; + +/** + * Created by sblackmon on 3/25/14. + */ +public class StreamsPigBuilder implements StreamBuilder { + @Override + public StreamBuilder addStreamsProcessor(String s, StreamsProcessor streamsProcessor, int i, String... strings) { + return null; + } + + @Override + public StreamBuilder addStreamsPersistWriter(String s, StreamsPersistWriter streamsPersistWriter, int i, String... strings) { + return null; + } + + @Override + public StreamBuilder newPerpetualStream(String s, StreamsProvider streamsProvider) { + return null; + } + + @Override + public StreamBuilder newReadCurrentStream(String s, StreamsProvider streamsProvider) { + return null; + } + + @Override + public StreamBuilder newReadNewStream(String s, StreamsProvider streamsProvider, BigInteger bigInteger) { + return null; + } + + @Override + public StreamBuilder newReadRangeStream(String s, StreamsProvider streamsProvider, DateTime dateTime, DateTime dateTime2) { + return null; + } + + @Override + public void start() { + + } + + @Override + public void stop() { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java new file mode 100644 index 0000000..437011c --- /dev/null +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java @@ -0,0 +1,77 @@ +package org.apache.streams.pig; + +import com.google.common.collect.Lists; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.EvalFunc; +import org.apache.pig.builtin.MonitoredUDF; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.util.UDFContext; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Created by sblackmon on 3/25/14. + */ +@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 10, intDefault = 10) +public class StreamsProcessorExec extends EvalFunc<DataBag> { + + TupleFactory mTupleFactory = TupleFactory.getInstance(); + BagFactory mBagFactory = BagFactory.getInstance(); + + StreamsProcessor streamsProcessor; + + public StreamsProcessorExec(String... execArgs) throws ClassNotFoundException{ + String classFullName = execArgs[0]; + String[] constructorArgs = new String[execArgs.length-1]; + ArrayUtils.remove(execArgs, 0); + ArrayUtils.addAll(constructorArgs, execArgs); + streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName), constructorArgs); + streamsProcessor.prepare(null); + } + + @Override + public DataBag exec(Tuple line) throws IOException { + + if (line == null || line.size() == 0) + return null; + + Configuration conf = UDFContext.getUDFContext().getJobConf(); + + Long id = (Long)line.get(0); + String source = (String)line.get(1); + Long timestamp = (Long)line.get(2); + String object = (String)line.get(3); + + StreamsDatum entry = new StreamsDatum(object); + + List<StreamsDatum> resultSet = streamsProcessor.process(entry); + List<Tuple> resultTupleList = Lists.newArrayList(); + + for( StreamsDatum resultDatum : resultSet ) { + Tuple tuple = mTupleFactory.newTuple(); + tuple.append(id); + tuple.append(source); + tuple.append(timestamp); + tuple.append(resultDatum.getDocument()); + resultTupleList.add(tuple); + } + + DataBag result = mBagFactory.newDefaultBag(resultTupleList); + + return result; + + } + + public void finish() { + streamsProcessor.cleanUp(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java new file mode 100644 index 0000000..1f0e619 --- /dev/null +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java @@ -0,0 +1,78 @@ +package org.apache.streams.pig; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.EvalFunc; +import org.apache.pig.builtin.MonitoredUDF; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.util.UDFContext; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.pojo.json.Activity; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Created by sblackmon on 3/25/14. + */ +@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 10, intDefault = 10) +public class StreamsSerializerExec extends EvalFunc<String> { + + ActivitySerializer activitySerializer; + ObjectMapper mapper = new ObjectMapper(); + + public StreamsSerializerExec(String... execArgs) throws ClassNotFoundException{ + Preconditions.checkNotNull(execArgs); + System.out.println("A"); + Preconditions.checkArgument(execArgs.length > 0); + System.out.println("B"); + String classFullName = execArgs[0]; + Preconditions.checkNotNull(classFullName); + System.out.println("C"); + String[] constructorArgs = new String[execArgs.length-1]; + ArrayUtils.remove(execArgs, 0); + ArrayUtils.addAll(constructorArgs, execArgs); + System.out.println("D"); + activitySerializer = StreamsComponentFactory.getSerializerInstance(Class.forName(classFullName)); + System.out.println("E"); + } + + @Override + public String exec(Tuple input) throws IOException { + + Preconditions.checkNotNull(activitySerializer); + System.out.println("1"); + Preconditions.checkNotNull(input); + System.out.println("2"); + Preconditions.checkArgument(input.size() == 1); + System.out.println("3"); + Configuration conf = UDFContext.getUDFContext().getJobConf(); + + String document = (String) input.get(0); + + Preconditions.checkNotNull(document); + System.out.println("4"); + Activity activity = null; + try { + activity = activitySerializer.deserialize(document); + } catch( Exception e ) { + e.printStackTrace(); + } + System.out.println("5"); + Preconditions.checkNotNull(activity); + System.out.println("6"); + + return mapper.writeValueAsString(activity); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java new file mode 100644 index 0000000..38609b7 --- /dev/null +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java @@ -0,0 +1,10 @@ +package org.apache.streams.pig; + +import org.apache.pig.builtin.PigStorage; + +/** + * Created by sblackmon on 3/25/14. + */ +public class StreamsStorage extends PigStorage { + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-storm/pom.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-storm/pom.xml b/streams-runtimes/streams-runtime-storm/pom.xml index 819eb82..810d277 100644 --- a/streams-runtimes/streams-runtime-storm/pom.xml +++ b/streams-runtimes/streams-runtime-storm/pom.xml @@ -61,10 +61,12 @@ <dependency> <groupId>storm</groupId> <artifactId>storm-core</artifactId> + <scope>provided</scope> </dependency> <dependency> <groupId>storm</groupId> <artifactId>storm-netty</artifactId> + <scope>provided</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> @@ -76,7 +78,7 @@ <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> - <version>0.2</version> + <version>0.3</version> <scope>compile</scope> <exclusions> <exclusion>
