STREAMS-105 | Updated the InstagramTypeConverter to use the conversion utility functions provided in InstagramActivityUtil
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fa3f9220 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fa3f9220 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fa3f9220 Branch: refs/heads/STREAMS-46 Commit: fa3f92200d19605e931954a154e59123d1a36f03 Parents: 1163653 Author: Robert Douglas <[email protected]> Authored: Wed Jul 2 10:48:35 2014 -0500 Committer: Robert Douglas <[email protected]> Committed: Wed Jul 2 10:48:35 2014 -0500 ---------------------------------------------------------------------- .../processor/InstagramTypeConverter.java | 116 +++---------------- .../serializer/util/InstagramActivityUtil.java | 32 +++-- 2 files changed, 34 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fa3f9220/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java index 14260e3..7fb1ec6 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java @@ -18,52 +18,34 @@ package org.apache.streams.instagram.processor; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.instagram.serializer.InstagramJsonActivitySerializer; -import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.instagram.serializer.util.InstagramActivityUtil; import org.apache.streams.pojo.json.Activity; import org.jinstagram.entity.users.feed.MediaFeedData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.List; import java.util.Queue; -/** - * Created by sblackmon on 12/10/13. - */ public class InstagramTypeConverter implements StreamsProcessor { public final static String STREAMS_ID = "InstagramTypeConverter"; private final static Logger LOGGER = LoggerFactory.getLogger(InstagramTypeConverter.class); - private ObjectMapper mapper; - private Queue<MediaFeedData> inQueue; private Queue<StreamsDatum> outQueue; - private Class inClass; - private Class outClass; - - private InstagramJsonActivitySerializer instagramJsonActivitySerializer; + private InstagramActivityUtil instagramActivityUtil; private int count = 0; public final static String TERMINATE = new String("TERMINATE"); - public InstagramTypeConverter(Class inClass, Class outClass) { - this.inClass = inClass; - this.outClass = outClass; + public InstagramTypeConverter() { } public Queue<StreamsDatum> getProcessorOutputQueue() { @@ -74,100 +56,31 @@ public class InstagramTypeConverter implements StreamsProcessor { inQueue = inputQueue; } - public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException { - - Object result = null; - - if( outClass.equals( Activity.class )) { - LOGGER.debug("ACTIVITY"); - result = instagramJsonActivitySerializer.deserialize( - mapper.writeValueAsString(event)); - } else if( outClass.equals( ObjectNode.class )) { - LOGGER.debug("OBJECTNODE"); - result = mapper.convertValue(event, ObjectNode.class); - } else if( outClass.equals( String.class )) { - LOGGER.debug("OBJECTNODE"); - result = mapper.writeValueAsString(event); - } - - - // no supported conversion were applied - if( result != null ) { - count ++; - return result; - } - - LOGGER.debug("CONVERT FAILED"); - - return null; - - } - - public boolean validate(Object document, Class klass) { - - // TODO - return true; - } - - public boolean isValidJSON(final String json) { - boolean valid = false; - try { - final JsonParser parser = new ObjectMapper().getJsonFactory() - .createJsonParser(json); - while (parser.nextToken() != null) { - } - valid = true; - } catch (JsonParseException jpe) { - LOGGER.warn("validate: {}", jpe); - } catch (IOException ioe) { - LOGGER.warn("validate: {}", ioe); - } - - return valid; - } - @Override public List<StreamsDatum> process(StreamsDatum entry) { StreamsDatum result = null; try { - Object item = entry.getDocument(); - ObjectNode node; LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); - if( item instanceof String ) { - - // if the target is string, just pass-through - if( String.class.equals(outClass)) { - result = entry; - } - else { - // first check for valid json - node = (ObjectNode)mapper.readTree((String)item); + if(item instanceof MediaFeedData) { + //We don't need to use the mapper, since we have a process to convert between + //MediaFeedData objects and Activity objects already + Activity activity = new Activity(); - Object out = convert(node, String.class, outClass); + instagramActivityUtil.updateActivity((MediaFeedData)item, activity); - if( out != null && validate(out, outClass)) - result = new StreamsDatum(out); + if(activity.getId() != null) { + result = new StreamsDatum(activity); + count++; } - - } else if( item instanceof ObjectNode ) { - - // first check for valid json - node = (ObjectNode)mapper.valueToTree(item); - - Object out = convert(node, ObjectNode.class, outClass); - - if( out != null && validate(out, outClass)) - result = new StreamsDatum(out); - } - } catch (Exception e) { e.printStackTrace(); + LOGGER.error("Exception while converting MediaFeedData to Activity: {}", e.getMessage()); } if( result != null ) @@ -178,13 +91,12 @@ public class InstagramTypeConverter implements StreamsProcessor { @Override public void prepare(Object o) { - mapper = new StreamsJacksonMapper(); - instagramJsonActivitySerializer = new InstagramJsonActivitySerializer(); + instagramActivityUtil = new InstagramActivityUtil(); } @Override public void cleanUp() { - + //noop } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fa3f9220/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java index 0561ba7..bd926d5 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java @@ -53,7 +53,9 @@ public class InstagramActivityUtil { */ public static void updateActivity(MediaFeedData item, Activity activity) throws ActivitySerializerException { activity.setActor(buildActor(item)); - activity.setPublished(new DateTime(Long.parseLong(item.getCreatedTime()) * 1000)); + + if(item.getCreatedTime() != null) + activity.setPublished(new DateTime(Long.parseLong(item.getCreatedTime()) * 1000)); activity.setId(formatId(activity.getVerb(), Optional.fromNullable( @@ -78,16 +80,20 @@ public class InstagramActivityUtil { public static Actor buildActor(MediaFeedData item) { Actor actor = new Actor(); - Image image = new Image(); - image.setUrl(item.getUser().getProfilePictureUrl()); + try { + Image image = new Image(); + image.setUrl(item.getUser().getProfilePictureUrl()); - Map<String, Object> extensions = new HashMap<String, Object>(); - extensions.put("screenName", item.getUser().getUserName()); + Map<String, Object> extensions = new HashMap<String, Object>(); + extensions.put("screenName", item.getUser().getUserName()); - actor.setId(formatId(String.valueOf(item.getUser().getId()))); - actor.setImage(image); - actor.setAdditionalProperty("extensions", extensions); - actor.setAdditionalProperty("handle", item.getUser().getUserName()); + actor.setId(formatId(String.valueOf(item.getUser().getId()))); + actor.setImage(image); + actor.setAdditionalProperty("extensions", extensions); + actor.setAdditionalProperty("handle", item.getUser().getUserName()); + } catch (Exception e) { + LOGGER.error("Exception trying to build actor object: {}", e.getMessage()); + } return actor; } @@ -244,9 +250,11 @@ public class InstagramActivityUtil { addLocationExtension(activity, item); - Map<String, Object> likes = new HashMap<String, Object>(); - likes.put("count", item.getLikes().getCount()); - extensions.put("likes", likes); + if(item.getLikes() != null) { + Map<String, Object> likes = new HashMap<String, Object>(); + likes.put("count", item.getLikes().getCount()); + extensions.put("likes", likes); + } extensions.put("hashtags", item.getTags());
