attempting to fix jackson date deserialization
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a93bb176 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a93bb176 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a93bb176 Branch: refs/heads/master Commit: a93bb176d1c17322b860966553371fe3fefd5596 Parents: cd4355a Author: sblackmon <[email protected]> Authored: Tue Apr 1 11:43:48 2014 -0500 Committer: sblackmon <[email protected]> Committed: Tue Apr 1 11:43:48 2014 -0500 ---------------------------------------------------------------------- .../streams/twitter/test/SimpleTweetTest.java | 94 ++++++++++++++++++++ .../apache/streams/data/util/RFC3339Utils.java | 7 ++ .../jackson/StreamsDateTimeDeserializer.java | 2 +- .../jackson/StreamsDateTimeSerializer.java | 2 +- .../streams/pig/StreamsProcessDatumExec.java | 81 +++++++++++++++++ .../streams/pig/StreamsProcessDocumentExec.java | 83 +++++++++++++++++ .../streams/pig/StreamsProcessorExec.java | 81 ----------------- .../streams/pig/StreamsSerializerExec.java | 2 - .../src/test/resources/pigprocessortest.pig | 2 +- .../org/apache/streams/util/ComponentUtils.java | 22 +++++ 10 files changed, 290 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java new file mode 100644 index 0000000..8988de0 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java @@ -0,0 +1,94 @@ +package org.apache.streams.twitter.test; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Optional; +import org.apache.commons.lang.StringUtils; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.twitter.pojo.Delete; +import org.apache.streams.twitter.pojo.Retweet; +import org.apache.streams.twitter.pojo.Tweet; +import org.apache.streams.twitter.provider.TwitterEventClassifier; +import org.apache.streams.twitter.serializer.StreamsTwitterMapper; +import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** +* Created with IntelliJ IDEA. +* User: sblackmon +* Date: 8/20/13 +* Time: 5:57 PM +* To change this template use File | Settings | File Templates. +*/ +public class SimpleTweetTest { + + private final static Logger LOGGER = LoggerFactory.getLogger(SimpleTweetTest.class); + private ObjectMapper mapper = StreamsTwitterMapper.getInstance(); + + private static final String TWITTER_JSON= "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682356047872,\"id_str\":\"410898682356047872\",\"text\":\"RT @ughhblog: RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"https:\\/\\/about.twitter.com\\/products\\/tweetdeck\\\" rel=\\\"nofollow\\\"\\u003eTweetDeck\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":70463906,\"id_str\":\"70463906\",\"name\":\"MHM DESIGNS, LLC\",\"screen_name\":\"MHMDESIGNS\",\"location\":\"Los Angeles New York\",\"url\":\"http:\\/\\/www.mhmdesigns.com\",\"description\":\"Multi Media Made Simple- Web desig, Graphic Design, Internet Marketing, Photography, Video Production and much much more.\",\"protected\":false,\"followers_count\":10,\"friends_coun t\":64,\"listed_count\":1,\"created_at\":\"Mon Aug 31 18:31:54 +0000 2009\",\"favourites_count\":0,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":87,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"9AE4E8\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"BDDCAD\",\"profile_sidebar_fill_color\":\"DDFFCC\",\"profile_text_color\":\"333333\",\"profile_us e_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 10:56:49 +0000 2013\",\"id\":410724848306892800,\"id_str\":\"410724848306892800\",\"text\":\"RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/tweetbutton\\\" rel=\\\"nofollow\\\"\\u003eTweet Button\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":538836510,\"id_str\":\"538836510\",\"name\":\"UGHHBlog\",\"screen_name\":\"ughhblog\",\"location\":\"Los Angeles\",\"url\":\"http:\\/\\/www.undergroundhiphopblog.com\",\"description\":\"http:\\/\\/UN DERGROUNDHIPHOPBLOG.com: A top Indie\\/Underground Hip Hop community blog. Submission Email: [email protected] \\/\\/\\/ Official Host: @pawz1\",\"protected\":false,\"followers_count\":2598,\"friends_count\":373,\"listed_count\":25,\"created_at\":\"Wed Mar 28 05:40:49 +0000 2012\",\"favourites_count\":423,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":9623,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"131516\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.tw img.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_link_color\":\"009999\",\"profile_sidebar_border_color\":\"EEEEEE\",\"profile_sidebar_fill_color\":\"EFEFEF\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":4,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[31,53]}],\"user_mentions\":[{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[58,71]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76 371478\",\"indices\":[72,83]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"lang\":\"en\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[45,67]}],\"user_mentions\":[{\"screen_name\":\"ughhblog\",\"name\":\"UGHHBlog\",\"id\":538836510,\"id_str\":\"538836510\",\"indices\":[3,12]},{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[72,85]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76371478\",\"indices\":[86,97]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}"; + + private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer(); + + + // @Ignore + @Test + public void Tests() + { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + + InputStream is = SimpleTweetTest.class.getResourceAsStream("/testtweets.txt"); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + + ObjectNode event = null; + try { + event = (ObjectNode) mapper.readTree(TWITTER_JSON); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + + assertThat(event, is(not(nullValue()))); + + Tweet tweet = mapper.convertValue(event, Tweet.class); + + assertThat(tweet, is(not(nullValue()))); + assertThat(tweet.getCreatedAt(), is(not(nullValue()))); + assertThat(tweet.getText(), is(not(nullValue()))); + assertThat(tweet.getUser(), is(not(nullValue()))); + + Activity activity = null; + try { + activity = twitterJsonActivitySerializer.deserialize(TWITTER_JSON); + } catch (ActivitySerializerException e) { + e.printStackTrace(); + Assert.fail(); + } + + assertThat(activity, is(not(nullValue()))); + + assertThat(activity.getId(), is(not(nullValue()))); + assertThat(activity.getActor(), is(not(nullValue()))); + assertThat(activity.getActor().getId(), is(not(nullValue()))); + assertThat(activity.getVerb(), is(not(nullValue()))); + assertThat(activity.getProvider(), is(not(nullValue()))); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java b/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java index 7af16c4..473d6bb 100644 --- a/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java +++ b/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java @@ -30,6 +30,13 @@ import java.util.regex.Pattern; * Parses and formats Joda Time {@link org.joda.time.DateTime} dates to and from RFC3339 compatible Strings */ public class RFC3339Utils { + + private static final RFC3339Utils INSTANCE = new RFC3339Utils(); + + public static RFC3339Utils getInstance(){ + return INSTANCE; + } + private static final String BASE = "^[0-9]{4}\\-[0-9]{2}\\-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}"; private static final String TZ = "[+-][0-9]{2}:?[0-9]{2}$"; private static final String SUB_SECOND = "\\.([0-9]*)"; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java index 8d56c53..3286e74 100644 --- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java +++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java @@ -19,6 +19,6 @@ public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> { @Override public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException { - return RFC3339Utils.parseUTC(jpar.getValueAsString()); + return RFC3339Utils.getInstance().parseUTC(jpar.getValueAsString()); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java index 4677fce..26bc157 100644 --- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java +++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java @@ -23,6 +23,6 @@ public class StreamsDateTimeSerializer extends StdSerializer<DateTime> { @Override public void serialize(DateTime value, JsonGenerator jgen, SerializerProvider provider) throws IOException { - jgen.writeString(RFC3339Utils.format(value)); + jgen.writeString(RFC3339Utils.getInstance().format(value)); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java new file mode 100644 index 0000000..542c106 --- /dev/null +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java @@ -0,0 +1,81 @@ +package org.apache.streams.pig; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.EvalFunc; +import org.apache.pig.builtin.MonitoredUDF; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.util.UDFContext; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Created by sblackmon on 3/25/14. + */ +@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10) +public class StreamsProcessDatumExec extends EvalFunc<DataBag> { + + TupleFactory mTupleFactory = TupleFactory.getInstance(); + BagFactory mBagFactory = BagFactory.getInstance(); + + StreamsProcessor streamsProcessor; + + public StreamsProcessDatumExec(String... execArgs) throws ClassNotFoundException{ + Preconditions.checkNotNull(execArgs); + Preconditions.checkArgument(execArgs.length > 0); + String classFullName = execArgs[0]; + Preconditions.checkNotNull(classFullName); + String[] constructorArgs = new String[execArgs.length-1]; + ArrayUtils.remove(execArgs, 0); + ArrayUtils.addAll(constructorArgs, execArgs); + streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName)); + streamsProcessor.prepare(null); + } + + @Override + public DataBag exec(Tuple line) throws IOException { + + if (line == null || line.size() == 0) + return null; + + Configuration conf = UDFContext.getUDFContext().getJobConf(); + + String id = (String)line.get(0); + String provider = (String)line.get(1); + Long timestamp = (Long)line.get(2); + String object = (String)line.get(3); + + StreamsDatum entry = new StreamsDatum(object); + + List<StreamsDatum> resultSet = streamsProcessor.process(entry); + List<Tuple> resultTupleList = Lists.newArrayList(); + + for( StreamsDatum resultDatum : resultSet ) { + Tuple tuple = mTupleFactory.newTuple(); + tuple.append(id); + tuple.append(provider); + tuple.append(timestamp); + tuple.append(resultDatum.getDocument()); + resultTupleList.add(tuple); + } + + DataBag result = mBagFactory.newDefaultBag(resultTupleList); + + return result; + + } + + public void finish() { + streamsProcessor.cleanUp(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java new file mode 100644 index 0000000..469aa3f --- /dev/null +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java @@ -0,0 +1,83 @@ +package org.apache.streams.pig; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.EvalFunc; +import org.apache.pig.builtin.MonitoredUDF; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.util.UDFContext; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Created by sblackmon on 3/25/14. + */ +@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10) +public class StreamsProcessDocumentExec extends EvalFunc<String> { + + TupleFactory mTupleFactory = TupleFactory.getInstance(); + BagFactory mBagFactory = BagFactory.getInstance(); + + StreamsProcessor streamsProcessor; + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + public StreamsProcessDocumentExec(String... execArgs) throws ClassNotFoundException{ + Preconditions.checkNotNull(execArgs); + Preconditions.checkArgument(execArgs.length > 0); + String processorFullName = execArgs[0]; + Preconditions.checkNotNull(processorFullName); + streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(processorFullName)); + streamsProcessor.prepare(null); + } + + @Override + public String exec(Tuple input) throws IOException { + + Preconditions.checkNotNull(streamsProcessor); + Preconditions.checkNotNull(input); + Preconditions.checkArgument(input.size() == 1); + + String document = (String) input.get(0); + + Preconditions.checkNotNull(document); + + StreamsDatum entry = new StreamsDatum(document); + + Preconditions.checkNotNull(entry); + + List<StreamsDatum> resultSet = streamsProcessor.process(entry); + + Object resultDoc = null; + for( StreamsDatum resultDatum : resultSet ) { + resultDoc = resultDatum.getDocument(); + } + + Preconditions.checkNotNull(resultDoc); + + if( resultDoc instanceof String ) + return (String) resultDoc; + else if( resultDoc instanceof ObjectNode) + return mapper.writeValueAsString(resultDoc); + else + return null; + + } + + public void finish() { + streamsProcessor.cleanUp(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java deleted file mode 100644 index addded4..0000000 --- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java +++ /dev/null @@ -1,81 +0,0 @@ -package org.apache.streams.pig; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.commons.lang.ArrayUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.pig.EvalFunc; -import org.apache.pig.builtin.MonitoredUDF; -import org.apache.pig.data.BagFactory; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.util.UDFContext; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * Created by sblackmon on 3/25/14. - */ -@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10) -public class StreamsProcessorExec extends EvalFunc<DataBag> { - - TupleFactory mTupleFactory = TupleFactory.getInstance(); - BagFactory mBagFactory = BagFactory.getInstance(); - - StreamsProcessor streamsProcessor; - - public StreamsProcessorExec(String... execArgs) throws ClassNotFoundException{ - Preconditions.checkNotNull(execArgs); - Preconditions.checkArgument(execArgs.length > 0); - String classFullName = execArgs[0]; - Preconditions.checkNotNull(classFullName); - String[] constructorArgs = new String[execArgs.length-1]; - ArrayUtils.remove(execArgs, 0); - ArrayUtils.addAll(constructorArgs, execArgs); - streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName)); - streamsProcessor.prepare(null); - } - - @Override - public DataBag exec(Tuple line) throws IOException { - - if (line == null || line.size() == 0) - return null; - - Configuration conf = UDFContext.getUDFContext().getJobConf(); - - String id = (String)line.get(0); - String provider = (String)line.get(1); - Long timestamp = (Long)line.get(2); - String object = (String)line.get(3); - - StreamsDatum entry = new StreamsDatum(object); - - List<StreamsDatum> resultSet = streamsProcessor.process(entry); - List<Tuple> resultTupleList = Lists.newArrayList(); - - for( StreamsDatum resultDatum : resultSet ) { - Tuple tuple = mTupleFactory.newTuple(); - tuple.append(id); - tuple.append(provider); - tuple.append(timestamp); - tuple.append(resultDatum.getDocument()); - resultTupleList.add(tuple); - } - - DataBag result = mBagFactory.newDefaultBag(resultTupleList); - - return result; - - } - - public void finish() { - streamsProcessor.cleanUp(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java index 46675cb..67f3c61 100644 --- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java @@ -59,9 +59,7 @@ public class StreamsSerializerExec extends EvalFunc<String> { } catch( Exception e ) { e.printStackTrace(); } - System.out.println("5"); Preconditions.checkNotNull(activity); - System.out.println("6"); return mapper.writeValueAsString(activity); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig index 6b1511a..054b3af 100644 --- a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig +++ b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig @@ -1,4 +1,4 @@ -DEFINE UNWINDER org.apache.streams.pig.StreamsProcessorExec('org.apache.streams.local.test.processors.DoNothingProcessor'); +DEFINE UNWINDER org.apache.streams.pig.StreamsProcessDatumExec('org.apache.streams.local.test.processors.DoNothingProcessor'); activities = LOAD '*' USING PigStorage('\t') AS (activityid: chararray, source: chararray, timestamp: long, object: chararray); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java new file mode 100644 index 0000000..5bf1d53 --- /dev/null +++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java @@ -0,0 +1,22 @@ +package org.apache.streams.util; + +import java.util.Queue; + +/** + * Created by sblackmon on 3/31/14. + */ +public class ComponentUtils { + + public static void offerUntilSuccess(Object entry, Queue queue) { + + boolean success; + do { + synchronized( ComponentUtils.class ) { + success = queue.offer(entry); + } + Thread.yield(); + } + while( !success ); + } + +}
