making activity.links an array of strings, so it's useful adding processor-urls, still debugging
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9e757aed Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9e757aed Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9e757aed Branch: refs/heads/master Commit: 9e757aed94d61c93391af2da1f7f3789dbcc1d93 Parents: b59bcd2 Author: sblackmon <[email protected]> Authored: Fri Mar 28 21:26:49 2014 -0400 Committer: sblackmon <[email protected]> Committed: Fri Mar 28 21:26:49 2014 -0400 ---------------------------------------------------------------------- streams-contrib/pom.xml | 2 +- streams-contrib/streams-processor-urls/pom.xml | 65 ++ .../main/java/org/apache/streams/urls/Link.java | 57 ++ .../org/apache/streams/urls/LinkUnwinder.java | 372 ++++++++++ .../streams/urls/LinkUnwinderProcessor.java | 76 ++ .../streams/urls/TestLinkUnwinderProcessor.java | 76 ++ .../serializer/DatasiftActivitySerializer.java | 4 +- .../TwitterJsonActivitySerializer.java | 124 ++++ .../TwitterJsonTweetActivitySerializer.java | 6 +- .../twitter/test/TweetActivitySerDeTest.java | 118 ++++ .../streams/twitter/test/TweetSerDeTest.java | 14 + .../src/test/resources/testtweets.txt | 695 +++++++++++++++++++ .../jackson/StreamsDateTimeDeserializer.java | 32 + .../jackson/StreamsDateTimeSerializer.java | 28 + .../streams/jackson/StreamsJacksonModule.java | 16 + .../org/apache/streams/pojo/json/activity.json | 4 +- 16 files changed, 1681 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index 4f7a22f..67948c2 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -43,7 +43,7 @@ <module>streams-persist-hdfs</module> <module>streams-persist-kafka</module> <module>streams-persist-mongo</module> - <module>streams-processor-urlredirect</module> + <module>streams-processor-urls</module> <module>streams-provider-datasift</module> <module>streams-provider-facebook</module> <module>streams-provider-google</module> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-urls/pom.xml b/streams-contrib/streams-processor-urls/pom.xml new file mode 100644 index 0000000..966155f --- /dev/null +++ b/streams-contrib/streams-processor-urls/pom.xml @@ -0,0 +1,65 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <artifactId>streams-processor-urls</artifactId> + <version>0.1-SNAPSHOT</version> + + <parent> + <groupId>org.apache.streams</groupId> + <artifactId>streams-contrib</artifactId> + <version>0.1-SNAPSHOT</version> + </parent> + + <dependencies> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + <dependency> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-core</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + </dependencies> + + <build> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java new file mode 100644 index 0000000..ec282f5 --- /dev/null +++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/Link.java @@ -0,0 +1,57 @@ +package org.apache.streams.urls; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +public interface Link +{ + @JsonProperty("status") + public LinkStatus getStatus(); + + @JsonProperty("originalUrl") + public String getOriginalURL(); + + @JsonProperty("wasRedirected") + public boolean wasRedirected(); + + @JsonProperty("finalUrl") + public String getFinalURL(); + + @JsonProperty("domain") + public String getDomain(); + + @JsonProperty("normalizedUrl") + public String getNormalizedURL(); + + @JsonProperty("urlParts") + public List<String> getUrlParts(); + + @JsonProperty("finalStatusCode") + public String getFinalResponseCode(); + + @JsonProperty("isTracked") + public boolean isTracked(); + + @JsonProperty("redirects") + public List<String> getRedirects(); + + @JsonProperty("tookInMillis") + public long getTookInMillis(); + + public void run(); + + public enum LinkStatus { + SUCCESS, + ERROR, + MALFORMED_URL, + NOT_FOUND, + FORBIDDEN, + REDIRECT_ERROR, + UNAUTHORIZED, + LOOP, + HTTP_ERROR_STATUS, + EXCEPTION + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java new file mode 100644 index 0000000..a4a28f1 --- /dev/null +++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinder.java @@ -0,0 +1,372 @@ +package org.apache.streams.urls; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLDecoder; +import java.util.*; + +/** + * References: + * Some helpful references to help + * Purpose URL + * ------------- ---------------------------------------------------------------- + * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html + * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/ + * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior + */ + +public class LinkUnwinder implements Link +{ + private final static Logger LOGGER = LoggerFactory.getLogger(LinkUnwinder.class); + + private static final int MAX_ALLOWED_REDIRECTS = 30; + private static final int DEFAULT_HTTP_TIMEOUT = 5000; //originally 30000 + private static final String LOCATION_IDENTIFIER = "location"; + private static final String SET_COOKIE_IDENTIFIER = "set-cookie"; + + private Date startTime = new Date(); + private String originalURL; + private LinkStatus status; + private String finalURL; + private String domain; + private boolean wasRedirected; + private List<String> redirects = new ArrayList<String>(); + private boolean isTracked = false; + private int finalResponseCode; + private Collection<String> cookies; + + private String normalizedUrl; + private List<String> urlParts; + + private int redirectCount = 0; + private long tookInMillis = 0; + + private static final Collection<String> BOTS_ARE_OK = new ArrayList<String>() {{ + add("t.co"); + }}; + + private static final Collection<String> URL_TRACKING_TO_REMOVE = new ArrayList<String>() {{ + /****************************************************************** + * Google uses parameters in the URL string to track referrers + * on their Google Analytics and promotions. These are the + * identified URL patterns. + * + * URL: + * https://support.google.com/analytics/answer/1033867?hl=en + *****************************************************************/ + + // Required. Use utm_source to identify a search engine, newsletter name, or other source. + add("([\\?&])utm_source(=)[^&?]*"); + + // Required. Use utm_medium to identify a medium such as email or cost-per- click. + add("([\\?&])utm_medium(=)[^&?]*"); + + // Used for paid search. Use utm_term to note the keywords for this ad. + add("([\\?&])utm_term(=)[^&?]*"); + + // Used for A/B testing and content-targeted ads. Use utm_content to differentiate ads or links that point to the same + add("([\\?&])utm_content(=)[^&?]*"); + + // Used for keyword analysis. Use utm_campaign to identify a specific product promotion or strategic campaign. + add("([\\?&])utm_campaign(=)[^&?]*"); + }}; + + public boolean isFailure() { return false; } + public String getOriginalURL() { return this.originalURL; } + public LinkStatus getStatus() { return this.status; } + public String getDomain() { return this.domain; } + public String getFinalURL() { return this.finalURL; } + public List<String> getRedirects() { return this.redirects; } + public boolean wasRedirected() { return this.wasRedirected; } + public boolean isTracked() { return this.isTracked; } + public String getFinalResponseCode() { return Integer.toString(this.finalResponseCode); } + public long getTookInMillis() { return this.tookInMillis; } + public String getNormalizedURL() { return this.normalizedUrl; } + public List<String> getUrlParts() { return this.urlParts; } + + public LinkUnwinder(String originalURL) { + this.originalURL = originalURL; + } + + public void run() { + // we are going to try three times just incase we catch the service off-guard + // this is mainly to help us with our tests. + for(int i = 0; (i < 3) && this.finalURL == null ; i++) { + if(this.status != LinkStatus.SUCCESS) + unwindLink(this.originalURL); + } + this.finalURL = cleanURL(this.finalURL); + this.normalizedUrl = normalizeURL(this.finalURL); + this.urlParts = tokenizeURL(this.normalizedUrl); + + this.updateTookInMillis(); + } + + protected void updateTookInMillis() { + this.tookInMillis = new Date().getTime() - this.startTime.getTime(); + } + + public void unwindLink(String url) + { + // Check to see if they wound up in a redirect loop + if((this.redirectCount > 0 && (this.originalURL.equals(url) || this.redirects.contains(url))) || (this.redirectCount > MAX_ALLOWED_REDIRECTS)) + { + this.status = LinkStatus.LOOP; + return; + } + + if(!this.originalURL.equals(url)) + this.redirects.add(url); + + HttpURLConnection connection = null; + + try + { + URL thisURL = new URL(url); + connection = (HttpURLConnection)new URL(url).openConnection(); + + // now we are going to pretend that we are a browser... + // This is the way my mac works. + if(!BOTS_ARE_OK.contains(thisURL.getHost())) + { + connection.addRequestProperty("Host", thisURL.getHost()); + connection.addRequestProperty("Connection", "Keep-Alive"); + connection.addRequestProperty("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.48 Safari/537.36"); + connection.addRequestProperty("Accept-Language", "en-US,en;q=0.8,zh;q=0.6"); + + // the test to seattlemamadoc.com prompted this change. + // they auto detect bots by checking the referrer chain and the 'user-agent' + // this broke the t.co test. t.co URLs are EXPLICITLY ok with bots + // there is a list for URLS that behave this way at the top in BOTS_ARE_OK + // smashew 2013-13-2013 + + if(this.redirectCount > 0 && BOTS_ARE_OK.contains(thisURL.getHost())) + connection.addRequestProperty("Referrer", this.originalURL); + } + + connection.setReadTimeout(DEFAULT_HTTP_TIMEOUT); + connection.setConnectTimeout(DEFAULT_HTTP_TIMEOUT); + + connection.setInstanceFollowRedirects(false); + + if(this.cookies != null) + for (String cookie : cookies) + connection.addRequestProperty("Cookie", cookie.split(";", 1)[0]); + + connection.connect(); + + this.finalResponseCode = connection.getResponseCode(); + + /************** + * + */ + Map<String,List<String>> headers = createCaseInsenitiveMap(connection.getHeaderFields()); + /****************************************************************** + * If they want us to set cookies, well, then we will set cookies + * Example URL: + * http://nyti.ms/1bCpesx + *****************************************************************/ + if(headers.containsKey(SET_COOKIE_IDENTIFIER)) + this.cookies = headers.get(SET_COOKIE_IDENTIFIER); + + switch (this.finalResponseCode) + { + case 200: // HTTP OK + this.finalURL = connection.getURL().toString(); + this.domain = new URL(this.finalURL).getHost(); + this.status = LinkStatus.SUCCESS; + break; + case 300: // Multiple choices + case 301: // URI has been moved permanently + case 302: // Found + case 303: // Primarily for a HTTP Post + case 304: // Not Modified + case 306: // This status code is unused but in the redirect block. + case 307: // Temporary re-direct + /******************************************************************* + * Author: + * Smashew + * + * Date: 2013-11-15 + * + * Note: + * It is possible that we have already found our final URL. In + * the event that we have found our final URL, we are going to + * save this URL as long as it isn't the original URL. + * We are still going to ask the browser to re-direct, but in the + * case of yet another redirect, seen with the redbull test + * this can be followed by a 304, a browser, by W3C standards would + * still render the page with it's content, but for us to assert + * a success, we are really hoping for a 304 message. + *******************************************************************/ + if(!this.originalURL.toLowerCase().equals(connection.getURL().toString().toLowerCase())) + this.finalURL = connection.getURL().toString(); + if(!headers.containsKey(LOCATION_IDENTIFIER)) + { + LOGGER.info("Headers: {}", headers); + this.status = LinkStatus.REDIRECT_ERROR; + } + else + { + this.wasRedirected = true; + this.redirectCount++; + unwindLink(connection.getHeaderField(LOCATION_IDENTIFIER)); + } + break; + case 305: // User must use the specified proxy (deprecated by W3C) + break; + case 401: // Unauthorized (nothing we can do here) + this.status = LinkStatus.UNAUTHORIZED; + break; + case 403: // HTTP Forbidden (Nothing we can do here) + this.status = LinkStatus.FORBIDDEN; + break; + case 404: // Not Found (Page is not found, nothing we can do with a 404) + this.status = LinkStatus.NOT_FOUND; + break; + case 500: // Internal Server Error + case 501: // Not Implemented + case 502: // Bad Gateway + case 503: // Service Unavailable + case 504: // Gateway Timeout + case 505: // Version not supported + this.status = LinkStatus.HTTP_ERROR_STATUS; + break; + default: + LOGGER.info("Unrecognized HTTP Response Code: {}", this.finalResponseCode); + this.status = LinkStatus.NOT_FOUND; + break; + } + } + catch (MalformedURLException e) + { + // the URL is trash, so, it can't load it. + this.status = LinkStatus.MALFORMED_URL; + } + catch (IOException ex) + { + // there was an issue we are going to set to error. + this.status = LinkStatus.ERROR; + } + catch (Exception ex) + { + // there was an unknown issue we are going to set to exception. + this.status = LinkStatus.EXCEPTION; + } + finally + { + if (connection != null) + connection.disconnect(); + } + } + + private Map<String,List<String>> createCaseInsenitiveMap(Map<String,List<String>> input) { + Map<String,List<String>> toReturn = new HashMap<String, List<String>>(); + for(String k : input.keySet()) + if(k != null && input.get(k) != null) + toReturn.put(k.toLowerCase(), input.get(k)); + return toReturn; + } + + private String cleanURL(String url) + { + // If they pass us a null URL then we are going to pass that right back to them. + if(url == null) + return null; + + // remember how big the URL was at the start + int startLength = url.length(); + + // Iterate through all the known URL parameters of tracking URLs + for(String pattern : URL_TRACKING_TO_REMOVE) + url = url.replaceAll(pattern, ""); + + // If the URL is smaller than when it came in. Then it had tracking information + if(url.length() < startLength) + this.isTracked = true; + + // return our url. + return url; + } + + /** + * Removes the protocol, if it exists, from the front and + * removes any random encoding characters + * Extend this to do other url cleaning/pre-processing + * @param url - The String URL to normalize + * @return normalizedUrl - The String URL that has no junk or surprises + */ + public static String normalizeURL(String url) + { + // Decode URL to remove any %20 type stuff + String normalizedUrl = url; + try { + // I've used a URLDecoder that's part of Java here, + // but this functionality exists in most modern languages + // and is universally called url decoding + normalizedUrl = URLDecoder.decode(url, "UTF-8"); + } + catch(UnsupportedEncodingException uee) + { + System.err.println("Unable to Decode URL. Decoding skipped."); + uee.printStackTrace(); + } + + // Remove the protocol, http:// ftp:// or similar from the front + if (normalizedUrl.contains("://")) + normalizedUrl = normalizedUrl.split(":/{2}")[1]; + + // Room here to do more pre-processing + + return normalizedUrl; + } + + /** + * Goal is to get the different parts of the URL path. This can be used + * in a classifier to help us determine if we are working with + * + * Reference: + * http://stackoverflow.com/questions/10046178/pattern-matching-for-url-classification + * @param url - Url to be tokenized + * @return tokens - A String array of all the tokens + */ + public static List<String> tokenizeURL(String url) + { + url = normalizeURL(url); + // I assume that we're going to use the whole URL to find tokens in + // If you want to just look in the GET parameters, or you want to ignore the domain + // or you want to use the domain as a token itself, that would have to be + // processed above the next line, and only the remaining parts split + List<String> toReturn = new ArrayList<String>(); + + // Split the URL by forward slashes. Most modern browsers will accept a URL + // this malformed such as http://www.smashew.com/hello//how////are/you + // hence the '+' in the regular expression. + for(String part: url.split("/+")) + toReturn.add(part.toLowerCase()); + + // return our object. + return toReturn; + + // One could alternatively use a more complex regex to remove more invalid matches + // but this is subject to your (?:in)?ability to actually write the regex you want + + // These next two get rid of tokens that are too short, also. + + // Destroys anything that's not alphanumeric and things that are + // alphanumeric but only 1 character long + //String[] tokens = url.split("(?:[\\W_]+\\w)*[\\W_]+"); + + // Destroys anything that's not alphanumeric and things that are + // alphanumeric but only 1 or 2 characters long + //String[] tokens = url.split("(?:[\\W_]+\\w{1,2})*[\\W_]+"); + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java new file mode 100644 index 0000000..45ec04d --- /dev/null +++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java @@ -0,0 +1,76 @@ +package org.apache.streams.urls; + +import com.google.common.collect.Lists; +import org.apache.streams.urls.Link; +import org.apache.streams.urls.LinkUnwinder; +import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.pojo.json.Activity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * References: + * Some helpful references to help + * Purpose URL + * ------------- ---------------------------------------------------------------- + * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html + * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/ + * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior + */ + +public class LinkUnwinderProcessor implements StreamsProcessor +{ + private final static String STREAMS_ID = "LinkUnwinderProcessor"; + + private final static Logger LOGGER = LoggerFactory.getLogger(LinkUnwinderProcessor.class); + + + + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + + List<StreamsDatum> result = Lists.newArrayList(); + + LOGGER.debug("{} processing {}", STREAMS_ID, entry.getDocument().getClass()); + + // get list of shared urls + if( entry.getDocument() instanceof Activity) { + Activity activity = (Activity) entry.getDocument(); + List<String> inputLinks = activity.getLinks(); + List<String> outputLinks = Lists.newArrayList(); + for( String link : inputLinks ) { + try { + LinkUnwinder unwinder = new LinkUnwinder((String)link); + unwinder.run(); + if( !unwinder.isFailure()) { + outputLinks.add(unwinder.getFinalURL()); + } + } catch (Exception e) { + //if unwindable drop + LOGGER.debug("Failed to unwind link : {}", link); + LOGGER.debug("Excpetion unwind link : {}", e); + } + } + activity.setLinks(outputLinks); + entry.setDocument(activity); + result.add(entry); + + return result; + } + else throw new NotImplementedException(); + } + + @Override + public void prepare(Object o) { + } + + @Override + public void cleanUp() { + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java new file mode 100644 index 0000000..94ae2d2 --- /dev/null +++ b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java @@ -0,0 +1,76 @@ +package org.apache.streams.urls; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.jackson.StreamsJacksonModule; +import org.apache.streams.pojo.json.Activity; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import static org.junit.Assert.*; + +import java.util.List; +import java.util.Scanner; + +/** + * Created by rebanks on 2/27/14. + */ +public class TestLinkUnwinderProcessor { + + private static String activityString; + + @Test + public void testActivityLinkUnwinderProcessorBitly() throws Exception{ + testActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4"), Lists.newArrayList("http://www.wcgworld.com/")); + } + + @Test + public void testActivityLinkUnwinderProcessorGoogle() throws Exception{ + testActivityUnwinderHelper(Lists.newArrayList("http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/")); + } + + @Test + public void testActivityLinkUnwinderProcessorOwly() throws Exception{ + testActivityUnwinderHelper(Lists.newArrayList("http://ow.ly/u4Kte"), Lists.newArrayList("http://www.wcgworld.com/")); + } + + @Test + public void testActivityLinkUnwinderProcessorGoDaddy() throws Exception{ + testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt"), Lists.newArrayList("http://www.wcgworld.com/")); + } + + @Test + public void testActivityLinkUnwinderProcessorMulti() throws Exception{ + testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt", "http://ow.ly/u4Kte", "http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/", "http://www.wcgworld.com/", "http://www.wcgworld.com/")); + } + + @Test + public void testActivityLinkUnwinderProcessorUnwindable() throws Exception{ + testActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4", "http://nope@#$%"), Lists.newArrayList("http://www.wcgworld.com/")); + } + + public void testActivityUnwinderHelper(List<String> input, List<String> expected) throws Exception{ + ObjectMapper mapper = new ObjectMapper(); + mapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + mapper.registerModule(new StreamsJacksonModule()); + Activity activity = new Activity(); + activity.setLinks(input); + StreamsDatum datum = new StreamsDatum(activity); + LinkUnwinderProcessor processor = new LinkUnwinderProcessor(); + processor.prepare(null); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertTrue(resultDatum.getDocument() instanceof Activity); + Activity resultActivity = (Activity) resultDatum.getDocument(); + assertNotNull(resultActivity.getLinks()); + List<String> resultLinks = resultActivity.getLinks(); + assertEquals(expected.size(), resultLinks.size()); + assertEquals(Sets.newHashSet(expected), Sets.newHashSet(resultLinks)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java index 455a579..de65636 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java @@ -159,8 +159,8 @@ public class DatasiftActivitySerializer implements ActivitySerializer<Datasift>, return actObj; } - public static List<Object> getLinks(Interaction interaction) { - List<Object> links = Lists.newArrayList(); + public static List<String> getLinks(Interaction interaction) { + List<String> links = Lists.newArrayList(); return links; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java new file mode 100644 index 0000000..fceff2c --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java @@ -0,0 +1,124 @@ +package org.apache.streams.twitter.serializer; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.AnnotationIntrospector; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.jackson.StreamsJacksonModule; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.Provider; +import org.apache.streams.twitter.pojo.Delete; +import org.apache.streams.twitter.pojo.Retweet; +import org.apache.streams.twitter.pojo.Tweet; +import org.apache.streams.twitter.provider.TwitterEventClassifier; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Created by sblackmon on 3/26/14. + */ +public class TwitterJsonActivitySerializer implements ActivitySerializer<String> +{ + + public TwitterJsonActivitySerializer() { + + } + + public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy"); + public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime(); + + public static ObjectMapper mapper; + static { + mapper = new ObjectMapper(); + mapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + mapper.registerModule(new StreamsJacksonModule() { + { + addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) { + @Override + public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException { + return TWITTER_FORMAT.parseDateTime(jpar.getValueAsString()); + } + }); + } + }); + //AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory()); + //mapper.setAnnotationIntrospector(introspector); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE); + mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.FALSE); + mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, Boolean.TRUE); + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + + } + + TwitterJsonTweetActivitySerializer tweetActivitySerializer = new TwitterJsonTweetActivitySerializer(); + TwitterJsonRetweetActivitySerializer retweetActivitySerializer = new TwitterJsonRetweetActivitySerializer(); + TwitterJsonDeleteActivitySerializer deleteActivitySerializer = new TwitterJsonDeleteActivitySerializer(); + + @Override + public String serializationFormat() { + return null; + } + + @Override + public String serialize(Activity deserialized) throws ActivitySerializerException { + throw new NotImplementedException(); + } + + @Override + public Activity deserialize(String serialized) throws ActivitySerializerException { + + Class documentSubType = TwitterEventClassifier.detectClass(serialized); + + Activity activity; + if( documentSubType == Tweet.class ) + activity = tweetActivitySerializer.deserialize(serialized); + else if( documentSubType == Retweet.class ) + activity = retweetActivitySerializer.deserialize(serialized); + else if( documentSubType == Delete.class ) + activity = deleteActivitySerializer.deserialize(serialized); + else throw new ActivitySerializerException("unrecognized type"); + + return activity; + } + + @Override + public List<Activity> deserializeAll(List<String> serializedList) { + throw new NotImplementedException(); + } + + public static Provider getProvider() { + Provider provider = new Provider(); + provider.setId("id:providers:twitter"); + return provider; + } + + public static void addTwitterExtension(Activity activity, ObjectNode event) { + Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity); + extensions.put("twitter", event); + } + + public static String formatId(String... idparts) { + return Joiner.on(":").join(Lists.asList("id:twitter", idparts)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java index 8bcb60b..a038792 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java @@ -81,8 +81,6 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St activity.setUrl("http://twitter.com/" + tweet.getIdStr()); activity.setLinks(getLinks(tweet)); - System.out.println("12"); - addTwitterExtension(activity, TwitterJsonActivitySerializer.mapper.convertValue(tweet, ObjectNode.class)); addLocationExtension(activity, tweet); return activity; @@ -109,8 +107,8 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St return actor; } - public static List<Object> getLinks(Tweet tweet) { - List<Object> links = Lists.newArrayList(); + public static List<String> getLinks(Tweet tweet) { + List<String> links = Lists.newArrayList(); if( tweet.getEntities().getUrls() != null ) { for (Url url : tweet.getEntities().getUrls()) { links.add(url.getExpandedUrl()); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java new file mode 100644 index 0000000..494f698 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java @@ -0,0 +1,118 @@ +package org.apache.streams.twitter.test; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.lang.StringUtils; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.twitter.pojo.Retweet; +import org.apache.streams.twitter.pojo.Tweet; +import org.apache.streams.twitter.provider.TwitterEventClassifier; +import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + +import static java.util.regex.Pattern.matches; +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** +* Created with IntelliJ IDEA. +* User: sblackmon +* Date: 8/20/13 +* Time: 5:57 PM +* To change this template use File | Settings | File Templates. +*/ +public class TweetActivitySerDeTest { + + private final static Logger LOGGER = LoggerFactory.getLogger(TweetActivitySerDeTest.class); + + private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer(); + + // @Ignore + @Test + public void Tests() + { + InputStream is = TweetActivitySerDeTest.class.getResourceAsStream("/testtweets.txt"); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + + try { + while (br.ready()) { + String line = br.readLine(); + if(!StringUtils.isEmpty(line)) + { + LOGGER.info("raw: {}", line); + + Class detected = TwitterEventClassifier.detectClass(line); + + Activity activity = twitterJsonActivitySerializer.deserialize(line); + + String activitystring = TwitterJsonActivitySerializer.mapper.writeValueAsString(activity); + + LOGGER.info("activity: {}", activitystring); + + assertThat(activity, is(not(nullValue()))); + + assertThat(activity.getId(), is(not(nullValue()))); + assertThat(activity.getActor(), is(not(nullValue()))); + assertThat(activity.getActor().getId(), is(not(nullValue()))); + assertThat(activity.getVerb(), is(not(nullValue()))); + assertThat(activity.getProvider(), is(not(nullValue()))); + + if( detected == Tweet.class ) { + assertThat(activity.getObject(), is(nullValue())); + + assertEquals(activity.getVerb(), "post"); + + Tweet tweet = TwitterJsonActivitySerializer.mapper.readValue(line, Tweet.class); + + if( tweet.getEntities() != null && + tweet.getEntities().getUrls() != null && + tweet.getEntities().getUrls().size() > 0 ) { + + + assertThat(activity.getLinks(), is(not(nullValue()))); + assertEquals(tweet.getEntities().getUrls().size(), activity.getLinks().size()); + } + + } else if( detected == Retweet.class ) { + + Retweet retweet = TwitterJsonActivitySerializer.mapper.readValue(line, Retweet.class); + + assertThat(retweet.getRetweetedStatus(), is(not(nullValue()))); + + assertEquals(activity.getVerb(), "share"); + + assertThat(activity.getObject(), is(not(nullValue()))); + assertThat(activity.getObject().getObjectType(), is(not(nullValue()))); + assertThat(activity.getObject().getObjectType(), is(not(nullValue()))); + + if( retweet.getRetweetedStatus().getEntities() != null && + retweet.getRetweetedStatus().getEntities().getUrls() != null && + retweet.getRetweetedStatus().getEntities().getUrls().size() > 0 ) { + + assertThat(activity.getLinks(), is(not(nullValue()))); + assertEquals(retweet.getRetweetedStatus().getEntities().getUrls().size(), activity.getLinks().size()); + } + + } + + + + } + } + } catch( Exception e ) { + System.out.println(e); + e.printStackTrace(); + Assert.fail(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e757aed/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java index c6dc0ad..bc7bcf7 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java @@ -3,6 +3,7 @@ package org.apache.streams.twitter.test; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Optional; import org.apache.commons.lang.StringUtils; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.pojo.Delete; @@ -21,6 +22,7 @@ import java.io.InputStreamReader; import static java.util.regex.Pattern.matches; import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -50,6 +52,9 @@ public class TweetSerDeTest { InputStreamReader isr = new InputStreamReader(is); BufferedReader br = new BufferedReader(isr); + int tweetlinks = 0; + int retweetlinks = 0; + try { while (br.ready()) { String line = br.readLine(); @@ -72,6 +77,8 @@ public class TweetSerDeTest { assertThat(tweet.getText(), is(not(nullValue()))); assertThat(tweet.getUser(), is(not(nullValue()))); + tweetlinks += Optional.fromNullable(tweet.getEntities().getUrls().size()).or(0); + } else if( detected == Retweet.class ) { Retweet retweet = mapper.convertValue(event, Retweet.class); @@ -83,6 +90,8 @@ public class TweetSerDeTest { assertThat(retweet.getRetweetedStatus().getUser().getId(), is(not(nullValue()))); assertThat(retweet.getRetweetedStatus().getUser().getCreatedAt(), is(not(nullValue()))); + retweetlinks += Optional.fromNullable(retweet.getRetweetedStatus().getEntities().getUrls().size()).or(0); + } else if( detected == Delete.class ) { Delete delete = mapper.convertValue(event, Delete.class); @@ -95,6 +104,7 @@ public class TweetSerDeTest { } else { Assert.fail(); } + } } } catch( Exception e ) { @@ -102,5 +112,9 @@ public class TweetSerDeTest { e.printStackTrace(); Assert.fail(); } + + assertThat(tweetlinks, is(greaterThan(0))); + assertThat(retweetlinks, is(greaterThan(0))); + } }
