added prototype pig runtime wrapper for serializers and processors
added serialization exceptions
storm version bump to match maven central


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7f543e5b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7f543e5b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7f543e5b

Branch: refs/heads/master
Commit: 7f543e5b93327cc2fa6f5dc7e901d18b88a22949
Parents: c2a858d
Author: sblackmon <[email protected]>
Authored: Tue Mar 25 18:46:12 2014 -0500
Committer: sblackmon <[email protected]>
Committed: Tue Mar 25 18:46:12 2014 -0500

----------------------------------------------------------------------
 pom.xml                                         |  4 +-
 .../processor/TwitterEventProcessor.java        | 11 ++-
 .../twitter/processor/TwitterTypeConverter.java |  3 +-
 .../TwitterJsonDeleteActivitySerializer.java    |  8 +-
 .../TwitterJsonEventActivitySerializer.java     | 37 ++++-----
 .../TwitterJsonRetweetActivitySerializer.java   | 12 ++-
 .../TwitterJsonTweetActivitySerializer.java     | 53 ++++++++++--
 .../apache/streams/data/ActivitySerializer.java |  5 +-
 .../exceptions/ActivitySerializerException.java | 27 +++++++
 streams-runtimes/pom.xml                        |  1 +
 streams-runtimes/streams-runtime-pig/pom.xml    | 84 ++++++++++++++++++++
 .../streams/pig/StreamsComponentFactory.java    | 58 ++++++++++++++
 .../apache/streams/pig/StreamsPigBuilder.java   | 54 +++++++++++++
 .../streams/pig/StreamsProcessorExec.java       | 77 ++++++++++++++++++
 .../streams/pig/StreamsSerializerExec.java      | 78 ++++++++++++++++++
 .../org/apache/streams/pig/StreamsStorage.java  | 10 +++
 streams-runtimes/streams-runtime-storm/pom.xml  |  4 +-
 17 files changed, 488 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a476c07..185ee79 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,8 +77,8 @@
         <guava.version>16.0.1</guava.version>
         <scala.version>2.8.0</scala.version>
         <clojure.version>1.4.0</clojure.version>
-        <storm.version>0.9.0.1</storm.version>
-        <kafka.version>0.8.0</kafka.version>
+        <storm.version>0.9.1-incubating</storm.version>
+        <kafka.version>0.8.1</kafka.version>
         <zookeeper.version>3.4.5-cdh4.5.0</zookeeper.version>
         <netty.version>3.8.0.Final</netty.version>
         <json-path.version>0.9.0</json-path.version>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index c3707cb..e76d47c 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
@@ -85,7 +86,7 @@ public class TwitterEventProcessor implements 
StreamsProcessor, Runnable {
         }
     }
 
-    public Object convert(ObjectNode event, Class inClass, Class outClass) {
+    public Object convert(ObjectNode event, Class inClass, Class outClass) 
throws ActivitySerializerException {
 
         Object result = null;
 
@@ -173,7 +174,13 @@ public class TwitterEventProcessor implements 
StreamsProcessor, Runnable {
             return Lists.newArrayList(new StreamsDatum(json));
         else {
             // convert to desired format
-            Object out = convert(node, inClass, outClass);
+            Object out = null;
+            try {
+                out = convert(node, inClass, outClass);
+            } catch (ActivitySerializerException e) {
+                LOGGER.warn("Failed deserializing", e);
+                return Lists.newArrayList();
+            }
 
             if( out != null && validate(out, outClass))
                 return Lists.newArrayList(new StreamsDatum(out));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index 73744c1..cd2d1ec 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
@@ -58,7 +59,7 @@ public class TwitterTypeConverter implements StreamsProcessor 
{
         inQueue = inputQueue;
     }
 
-    public Object convert(ObjectNode event, Class inClass, Class outClass) {
+    public Object convert(ObjectNode event, Class inClass, Class outClass) 
throws ActivitySerializerException {
 
         Object result = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
index 804f3b4..4e302f7 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
@@ -2,14 +2,14 @@ package org.apache.streams.twitter.serializer;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
+import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Actor;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Tweet;
 
-import java.io.Serializable;
-
 /**
 * Created with IntelliJ IDEA.
 * User: mdelaet
@@ -19,7 +19,7 @@ import java.io.Serializable;
 */
 public class TwitterJsonDeleteActivitySerializer extends 
TwitterJsonEventActivitySerializer {
 
-    public Activity convert(ObjectNode event) {
+    public Activity convert(ObjectNode event) throws 
ActivitySerializerException {
 
         Delete delete = null;
         try {
@@ -33,6 +33,8 @@ public class TwitterJsonDeleteActivitySerializer extends 
TwitterJsonEventActivit
         activity.setVerb("delete");
         activity.setObject(buildActivityObject(delete));
         activity.setId(formatId(activity.getVerb(), 
delete.getDelete().getStatus().getIdStr()));
+        if(Strings.isNullOrEmpty(activity.getId()))
+            throw new ActivitySerializerException("Unable to determine 
activity id");
         activity.setProvider(buildProvider(event));
         addTwitterExtension(activity, event);
         return activity;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
index 9d53527..004791c 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
@@ -9,16 +9,18 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.Generator;
 import org.apache.streams.pojo.json.Icon;
 import org.apache.streams.pojo.json.Provider;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.text.DateFormat;
 import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -32,9 +34,10 @@ import java.util.Map;
 */
 public abstract class TwitterJsonEventActivitySerializer implements 
ActivitySerializer<String>, Serializable {
 
-    public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
+    public static final DateTimeFormatter TWITTER_FORMAT = 
DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
+    public static final DateTimeFormatter ACTIVITY_FORMAT = 
DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
 
-    ObjectMapper mapper = new ObjectMapper();
+    public static final ObjectMapper mapper = new ObjectMapper();
 
     @Override
     public String serializationFormat() {
@@ -47,10 +50,10 @@ public abstract class TwitterJsonEventActivitySerializer 
implements ActivitySeri
     }
 
     @Override
-    public Activity deserialize(String serialized) {
+    public Activity deserialize(String serialized) throws 
ActivitySerializerException {
         serialized = serialized.replaceAll("\\[[ ]*\\]", "null");
 
-//        System.out.println(serialized);
+        System.out.println(serialized);
 
         AnnotationIntrospector introspector = new 
JaxbAnnotationIntrospector(mapper.getTypeFactory());
         mapper.setAnnotationIntrospector(introspector);
@@ -63,8 +66,12 @@ public abstract class TwitterJsonEventActivitySerializer 
implements ActivitySeri
         try {
             ObjectNode event = (ObjectNode) mapper.readTree(serialized);
 
+            System.out.println(event.toString());
+
             Activity activity = convert(event);
 
+            System.out.println(activity.toString());
+
             return activity;
 
         } catch (IOException e) {
@@ -73,25 +80,19 @@ public abstract class TwitterJsonEventActivitySerializer 
implements ActivitySeri
 
     }
 
-    public abstract Activity convert(ObjectNode event);
+    public abstract Activity convert(ObjectNode event) throws 
ActivitySerializerException;
 
     @Override
     public List<Activity> deserializeAll(List<String> serializedList) {
         throw new NotImplementedException("Not currently implemented");
     }
 
-    public static Date parse(String str) {
-        Date date;
+    public static Date parse(String str) throws ParseException {
+        DateTime dateTime;
         String dstr;
-        DateFormat fmt = new SimpleDateFormat(DATE_FORMAT);
-        DateFormat out = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
-        try {
-            date = fmt.parse(str);
-            dstr = out.format(date);
-            return out.parse(dstr);
-        } catch (ParseException e) {
-            throw new IllegalArgumentException("Invalid date format", e);
-        }
+        dateTime = TWITTER_FORMAT.parseDateTime(str);
+        dstr = ACTIVITY_FORMAT.print(dateTime);
+        return ACTIVITY_FORMAT.parseDateTime(dstr).toDate();
     }
 
     public static Generator buildGenerator(ObjectNode event) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
index 69860a1..de96742 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
@@ -2,8 +2,10 @@ package org.apache.streams.twitter.serializer;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Actor;
@@ -27,7 +29,7 @@ import static 
org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 */
 public class TwitterJsonRetweetActivitySerializer extends 
TwitterJsonEventActivitySerializer implements ActivitySerializer<String> {
 
-    public Activity convert(ObjectNode event) {
+    public Activity convert(ObjectNode event) throws 
ActivitySerializerException {
 
         Retweet retweet = null;
         try {
@@ -41,7 +43,13 @@ public class TwitterJsonRetweetActivitySerializer extends 
TwitterJsonEventActivi
         activity.setVerb("share");
         activity.setObject(buildActivityObject(retweet.getRetweetedStatus()));
         activity.setId(formatId(activity.getVerb(), retweet.getIdStr()));
-        activity.setPublished(parse(retweet.getCreatedAt()));
+        if(Strings.isNullOrEmpty(activity.getId()))
+            throw new ActivitySerializerException("Unable to determine 
activity id");
+        try {
+            activity.setPublished(parse(retweet.getCreatedAt()));
+        } catch( Exception e ) {
+            throw new ActivitySerializerException("Unable to determine 
publishedDate", e);
+        }
         activity.setGenerator(buildGenerator(event));
         activity.setIcon(getIcon(event));
         activity.setProvider(buildProvider(event));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index 08727d8..25943b5 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -2,8 +2,11 @@ package org.apache.streams.twitter.serializer;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Actor;
@@ -26,7 +29,11 @@ import static 
org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 */
 public class TwitterJsonTweetActivitySerializer extends 
TwitterJsonEventActivitySerializer implements ActivitySerializer<String> {
 
-    public Activity convert(ObjectNode event) {
+    public TwitterJsonTweetActivitySerializer() {
+
+    }
+
+    public Activity convert(ObjectNode event) throws 
ActivitySerializerException {
 
         Tweet tweet = null;
         try {
@@ -35,13 +42,28 @@ public class TwitterJsonTweetActivitySerializer extends 
TwitterJsonEventActivity
             e.printStackTrace();
         }
 
+        System.out.println("10");
+
         Activity activity = new Activity();
+
+        System.out.println("11");
+
         activity.setActor(buildActor(tweet));
         activity.setVerb("post");
         activity.setObject(buildActivityObject(tweet));
-        activity.setId(formatId(activity.getVerb(), tweet.getIdStr()));
+        activity.setId(formatId(activity.getVerb(),
+                Optional.fromNullable(
+                        tweet.getIdStr())
+                        .or(Optional.of(tweet.getId().toString()))
+                        .orNull()));
+        if(Strings.isNullOrEmpty(activity.getId()))
+            throw new ActivitySerializerException("Unable to determine 
activity id");
+        try {
+            activity.setPublished(parse(tweet.getCreatedAt()));
+        } catch( Exception e ) {
+            throw new ActivitySerializerException("Unable to determine 
publishedDate", e);
+        }
         activity.setTarget(buildTarget(tweet));
-        activity.setPublished(parse(tweet.getCreatedAt()));
         activity.setGenerator(buildGenerator(event));
         activity.setIcon(getIcon(event));
         activity.setProvider(buildProvider(event));
@@ -49,6 +71,9 @@ public class TwitterJsonTweetActivitySerializer extends 
TwitterJsonEventActivity
         activity.setContent(tweet.getText());
         activity.setUrl(getUrls(event));
         activity.setLinks(getLinks(tweet));
+
+        System.out.println("12");
+
         addTwitterExtension(activity, event);
         addLocationExtension(activity, tweet);
         return activity;
@@ -57,9 +82,13 @@ public class TwitterJsonTweetActivitySerializer extends 
TwitterJsonEventActivity
     public static Actor buildActor(Tweet tweet) {
         Actor actor = new Actor();
         User user = tweet.getUser();
-        actor.setId(formatId(user.getIdStr(), tweet.getIdStr()));
+        actor.setId(formatId(
+                Optional.fromNullable(
+                        user.getIdStr())
+                        .or(Optional.of(user.getId().toString()))
+                        .orNull()
+        ));
         actor.setDisplayName(user.getScreenName());
-        actor.setId(user.getIdStr());
         if (user.getUrl()!=null){
             actor.setUrl(user.getUrl());
         }
@@ -68,8 +97,13 @@ public class TwitterJsonTweetActivitySerializer extends 
TwitterJsonEventActivity
 
     public static ActivityObject buildActivityObject(Tweet tweet) {
         ActivityObject actObj = new ActivityObject();
-        actObj.setId(formatId(tweet.getIdStr()));
         actObj.setObjectType("tweet");
+        actObj.setId(formatId(
+                Optional.fromNullable(
+                        tweet.getIdStr())
+                        .or(Optional.of(tweet.getId().toString()))
+                        .orNull()
+        ));
         return actObj;
     }
 
@@ -88,7 +122,12 @@ public class TwitterJsonTweetActivitySerializer extends 
TwitterJsonEventActivity
     public static void addLocationExtension(Activity activity, Tweet tweet) {
         Map<String, Object> extensions = ensureExtensions(activity);
         Map<String, Object> location = new HashMap<String, Object>();
-        location.put("id", formatId(tweet.getIdStr()));
+        location.put("id", formatId(
+                Optional.fromNullable(
+                        tweet.getIdStr())
+                        .or(Optional.of(tweet.getId().toString()))
+                        .orNull()
+        ));
         location.put("coordinates", tweet.getCoordinates());
         extensions.put("location", location);
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java 
b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
index ae01fed..ad3809f 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.streams.data;
 
+import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
 
 import java.util.List;
@@ -40,14 +41,14 @@ public interface ActivitySerializer<T> {
      * @param deserialized the string
      * @return a fully populated Activity object
      */
-    T serialize(Activity deserialized);
+    T serialize(Activity deserialized) throws ActivitySerializerException;
 
     /**
      * Converts a POJO into an Activity
      * @param serialized the string representation
      * @return a fully populated Activity object
      */
-    Activity deserialize(T serialized);
+    Activity deserialize(T serialized) throws ActivitySerializerException;
 
     /**
      * Converts multiple documents into a list of Activity objects

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivitySerializerException.java
----------------------------------------------------------------------
diff --git 
a/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivitySerializerException.java
 
b/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivitySerializerException.java
new file mode 100644
index 0000000..0e5459b
--- /dev/null
+++ 
b/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivitySerializerException.java
@@ -0,0 +1,27 @@
+package org.apache.streams.exceptions;
+
+/**
+ * Created by sblackmon on 3/25/14.
+ */
+public class ActivitySerializerException extends Exception {
+
+    public ActivitySerializerException() {
+        // TODO Auto-generated constructor stub
+    }
+
+    public ActivitySerializerException(String message) {
+        super(message);
+        // TODO Auto-generated constructor stub
+    }
+
+    public ActivitySerializerException(Throwable cause) {
+        super(cause);
+        // TODO Auto-generated constructor stub
+    }
+
+    public ActivitySerializerException(String message, Throwable cause) {
+        super(message, cause);
+        // TODO Auto-generated constructor stub
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/pom.xml b/streams-runtimes/pom.xml
index 1abe74e..5d43c28 100644
--- a/streams-runtimes/pom.xml
+++ b/streams-runtimes/pom.xml
@@ -34,6 +34,7 @@
 
     <modules>
         <module>streams-runtime-local</module>
+        <module>streams-runtime-pig</module>
         <module>streams-runtime-storm</module>
         <module>streams-runtime-webapp</module>
     </modules>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-pig/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/pom.xml 
b/streams-runtimes/streams-runtime-pig/pom.xml
new file mode 100644
index 0000000..7a3b3dc
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>streams-project</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>streams-runtime-pig</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+            <version>4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>2.0.0-cdh4.5.0.1-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pig</groupId>
+            <artifactId>pig</artifactId>
+            <version>0.11.0-cdh4.5.0.1-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
 
b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
new file mode 100644
index 0000000..7b9e375
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
@@ -0,0 +1,58 @@
+package org.apache.streams.pig;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.ActivitySerializer;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Created by sblackmon on 3/25/14.
+ */
+public class StreamsComponentFactory {
+
+    public static ActivitySerializer getSerializerInstance(Class<?> 
serializerClazz) {
+
+        Object object = null;
+        try {
+            object = serializerClazz.getConstructor().newInstance();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        Preconditions.checkNotNull(object);
+
+        ActivitySerializer serializer = (ActivitySerializer) object;
+
+        return serializer;
+
+    }
+
+    public static StreamsProcessor getProcessorInstance(Class<?> clazz, 
String... args) {
+
+        Object object = null;
+        try {
+            int constructorStringCount = args.length;
+            List<Class> constructorSignature;
+            if( constructorStringCount == 0 )
+                constructorSignature = Lists.newArrayList();
+            else {
+                constructorSignature = 
Lists.newArrayListWithCapacity(args.length);
+                for (int i = 0; i < constructorStringCount; i++)
+                    constructorSignature.add(String.class);
+            }
+            String[] constructorArgs = args;
+            object = clazz.getConstructor(constructorSignature.toArray(new 
Class[args.length])).newInstance(constructorArgs);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        StreamsProcessor processor = (StreamsProcessor) object;
+        return processor;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
 
b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
new file mode 100644
index 0000000..5b11d7e
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
@@ -0,0 +1,54 @@
+package org.apache.streams.pig;
+
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+
+/**
+ * Created by sblackmon on 3/25/14.
+ */
+public class StreamsPigBuilder implements StreamBuilder {
+    @Override
+    public StreamBuilder addStreamsProcessor(String s, StreamsProcessor 
streamsProcessor, int i, String... strings) {
+        return null;
+    }
+
+    @Override
+    public StreamBuilder addStreamsPersistWriter(String s, 
StreamsPersistWriter streamsPersistWriter, int i, String... strings) {
+        return null;
+    }
+
+    @Override
+    public StreamBuilder newPerpetualStream(String s, StreamsProvider 
streamsProvider) {
+        return null;
+    }
+
+    @Override
+    public StreamBuilder newReadCurrentStream(String s, StreamsProvider 
streamsProvider) {
+        return null;
+    }
+
+    @Override
+    public StreamBuilder newReadNewStream(String s, StreamsProvider 
streamsProvider, BigInteger bigInteger) {
+        return null;
+    }
+
+    @Override
+    public StreamBuilder newReadRangeStream(String s, StreamsProvider 
streamsProvider, DateTime dateTime, DateTime dateTime2) {
+        return null;
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
 
b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
new file mode 100644
index 0000000..437011c
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
@@ -0,0 +1,77 @@
+package org.apache.streams.pig;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 3/25/14.
+ */
+@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 10, intDefault = 10)
+public class StreamsProcessorExec extends EvalFunc<DataBag> {
+
+    TupleFactory mTupleFactory = TupleFactory.getInstance();
+    BagFactory mBagFactory = BagFactory.getInstance();
+
+    StreamsProcessor streamsProcessor;
+
+    public StreamsProcessorExec(String... execArgs) throws 
ClassNotFoundException{
+        String classFullName = execArgs[0];
+        String[] constructorArgs = new String[execArgs.length-1];
+        ArrayUtils.remove(execArgs, 0);
+        ArrayUtils.addAll(constructorArgs, execArgs);
+        streamsProcessor = 
StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName), 
constructorArgs);
+        streamsProcessor.prepare(null);
+    }
+
+    @Override
+    public DataBag exec(Tuple line) throws IOException {
+
+        if (line == null || line.size() == 0)
+            return null;
+
+        Configuration conf = UDFContext.getUDFContext().getJobConf();
+
+        Long id = (Long)line.get(0);
+        String source = (String)line.get(1);
+        Long timestamp = (Long)line.get(2);
+        String object = (String)line.get(3);
+
+        StreamsDatum entry = new StreamsDatum(object);
+
+        List<StreamsDatum> resultSet = streamsProcessor.process(entry);
+        List<Tuple> resultTupleList = Lists.newArrayList();
+
+        for( StreamsDatum resultDatum : resultSet ) {
+            Tuple tuple = mTupleFactory.newTuple();
+            tuple.append(id);
+            tuple.append(source);
+            tuple.append(timestamp);
+            tuple.append(resultDatum.getDocument());
+            resultTupleList.add(tuple);
+        }
+
+        DataBag result = mBagFactory.newDefaultBag(resultTupleList);
+
+        return result;
+
+    }
+
+    public void finish() {
+        streamsProcessor.cleanUp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
 
b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
new file mode 100644
index 0000000..1f0e619
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
@@ -0,0 +1,78 @@
+package org.apache.streams.pig;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.pojo.json.Activity;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 3/25/14.
+ */
+@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 10, intDefault = 10)
+public class StreamsSerializerExec extends EvalFunc<String> {
+
+    ActivitySerializer activitySerializer;
+    ObjectMapper mapper = new ObjectMapper();
+
+    public StreamsSerializerExec(String... execArgs) throws 
ClassNotFoundException{
+        Preconditions.checkNotNull(execArgs);
+        System.out.println("A");
+        Preconditions.checkArgument(execArgs.length > 0);
+        System.out.println("B");
+        String classFullName = execArgs[0];
+        Preconditions.checkNotNull(classFullName);
+        System.out.println("C");
+        String[] constructorArgs = new String[execArgs.length-1];
+        ArrayUtils.remove(execArgs, 0);
+        ArrayUtils.addAll(constructorArgs, execArgs);
+        System.out.println("D");
+        activitySerializer = 
StreamsComponentFactory.getSerializerInstance(Class.forName(classFullName));
+        System.out.println("E");
+    }
+
+    @Override
+    public String exec(Tuple input) throws IOException {
+
+        Preconditions.checkNotNull(activitySerializer);
+        System.out.println("1");
+        Preconditions.checkNotNull(input);
+        System.out.println("2");
+        Preconditions.checkArgument(input.size() == 1);
+        System.out.println("3");
+        Configuration conf = UDFContext.getUDFContext().getJobConf();
+
+        String document = (String) input.get(0);
+
+        Preconditions.checkNotNull(document);
+        System.out.println("4");
+        Activity activity = null;
+        try {
+            activity = activitySerializer.deserialize(document);
+        } catch( Exception e ) {
+            e.printStackTrace();
+        }
+        System.out.println("5");
+        Preconditions.checkNotNull(activity);
+        System.out.println("6");
+
+        return mapper.writeValueAsString(activity);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java
 
b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java
new file mode 100644
index 0000000..38609b7
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java
@@ -0,0 +1,10 @@
+package org.apache.streams.pig;
+
+import org.apache.pig.builtin.PigStorage;
+
+/**
+ * Created by sblackmon on 3/25/14.
+ */
+public class StreamsStorage extends PigStorage {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7f543e5b/streams-runtimes/streams-runtime-storm/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/pom.xml 
b/streams-runtimes/streams-runtime-storm/pom.xml
index 819eb82..810d277 100644
--- a/streams-runtimes/streams-runtime-storm/pom.xml
+++ b/streams-runtimes/streams-runtime-storm/pom.xml
@@ -61,10 +61,12 @@
         <dependency>
             <groupId>storm</groupId>
             <artifactId>storm-core</artifactId>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>storm</groupId>
             <artifactId>storm-netty</artifactId>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.scala-lang</groupId>
@@ -76,7 +78,7 @@
         <dependency>
             <groupId>com.101tec</groupId>
             <artifactId>zkclient</artifactId>
-            <version>0.2</version>
+            <version>0.3</version>
             <scope>compile</scope>
             <exclusions>
                 <exclusion>

Reply via email to