adding google provider, tweaks to localbuilder and hdfs reader
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/61592dc0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/61592dc0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/61592dc0 Branch: refs/heads/STREAMS-26 Commit: 61592dc0742bcae969c0c34c9d4158d5d164eddc Parents: 2ec7fe8 Author: Steve Blackmon <[email protected]> Authored: Fri Mar 21 14:44:13 2014 -0500 Committer: Steve Blackmon <[email protected]> Committed: Fri Mar 21 14:44:13 2014 -0500 ---------------------------------------------------------------------- pom.xml | 2 +- streams-contrib/pom.xml | 1 + .../streams-components-test.iml | 39 +++- .../apache/streams/hdfs/HdfsConfigurator.java | 15 ++ .../streams/hdfs/WebHdfsPersistReader.java | 63 ++---- .../streams/hdfs/WebHdfsPersistReaderTask.java | 49 ++--- .../google-gmail/pom.xml | 135 +++++++++++++ .../com/google/gmail/GMailConfigurator.java | 27 +++ .../gmail/provider/GMailImapProviderTask.java | 58 ++++++ .../GMailMessageActivitySerializer.java | 194 +++++++++++++++++++ .../google/gmail/provider/GMailProvider.java | 131 +++++++++++++ .../gmail/provider/GMailRssProviderTask.java | 36 ++++ .../com/google/gmail/GMailConfiguration.json | 14 ++ .../gmail/test/GMailMessageSerDeTest.java | 53 +++++ .../src/test/resources/datasift_jsons.txt | 101 ++++++++++ .../google-gplus/pom.xml | 145 ++++++++++++++ .../gplus/provider/GPlusActivitySerializer.java | 117 +++++++++++ .../gplus/provider/GPlusConfigurator.java | 36 ++++ .../gplus/provider/GPlusEventProcessor.java | 79 ++++++++ .../provider/GPlusHistoryProviderTask.java | 88 +++++++++ .../google/gplus/provider/GPlusProvider.java | 166 ++++++++++++++++ .../com/google/gplus/GPlusConfiguration.json | 58 ++++++ .../src/main/resources/reference.conf | 11 ++ .../gmail/test/GMailMessageSerDeTest.java | 53 +++++ .../src/test/resources/datasift_jsons.txt | 101 ++++++++++ streams-contrib/streams-provider-google/pom.xml | 44 +++++ .../core/builders/LocalStreamBuilder.java | 11 +- .../streams/core/tasks/BaseStreamsTask.java | 2 +- .../core/tasks/LocalStreamMonitorThread.java | 69 +++++++ 29 files changed, 1821 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1fec2b7..b358dc6 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ <jaxbutil.version>1.2.6</jaxbutil.version> <junit.version>4.11</junit.version> <slf4j.version>1.7.6</slf4j.version> - <logback.version>1.0.9</logback.version> + <logback.version>1.1.1</logback.version> <commons-io.version>2.4</commons-io.version> <commons-lang3.version>3.1</commons-lang3.version> <typesafe.config.version>1.2.0</typesafe.config.version> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index 896bb9b..239644b 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -43,6 +43,7 @@ <module>streams-persist-mongo</module> <module>streams-provider-datasift</module> <module>streams-provider-facebook</module> + <module>streams-provider-google</module> <module>streams-provider-gnip</module> <module>streams-provider-moreover</module> <module>streams-provider-twitter</module> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-components-test/streams-components-test.iml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-components-test/streams-components-test.iml b/streams-contrib/streams-components-test/streams-components-test.iml index fbec879..60ae74a 100644 --- a/streams-contrib/streams-components-test/streams-components-test.iml +++ b/streams-contrib/streams-components-test/streams-components-test.iml @@ -11,11 +11,44 @@ <orderEntry type="inheritedJdk" /> <orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="module" module-name="streams-core (1)" /> + <orderEntry type="library" name="Maven: joda-time:joda-time:2.2" level="project" /> + <orderEntry type="module" module-name="streams-util (1)" /> + <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" /> + <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1" level="project" /> + <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6" level="project" /> + <orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.1.1" level="project" /> + <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.1.1" level="project" /> + <orderEntry type="module" module-name="streams-pojo (1)" /> + <orderEntry type="library" name="Maven: org.jsonschema2pojo:jsonschema2pojo-core:0.4.0" level="project" /> + <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.2.1" level="project" /> + <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.2.1" level="project" /> + <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.2.1" level="project" /> + <orderEntry type="library" name="Maven: com.sun.codemodel:codemodel:2.6" level="project" /> + <orderEntry type="library" name="Maven: commons-lang:commons-lang:2.6" level="project" /> + <orderEntry type="library" name="Maven: javax.validation:validation-api:1.0.0.GA" level="project" /> + <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.11" level="project" /> + <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.11" level="project" /> + <orderEntry type="library" name="Maven: com.google.code.gson:gson:2.2.4" level="project" /> + <orderEntry type="library" name="Maven: com.google.code.findbugs:annotations:1.3.9" level="project" /> + <orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.2.1" level="project" /> + <orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.2.1" level="project" /> + <orderEntry type="library" name="Maven: org.codehaus.woodstox:stax2-api:3.1.1" level="project" /> + <orderEntry type="library" name="Maven: javax.xml.stream:stax-api:1.0-2" level="project" /> + <orderEntry type="library" name="Maven: com.fasterxml:jackson-xml-databind:0.5.0" level="project" /> + <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-xc:1.7.0" level="project" /> + <orderEntry type="library" name="Maven: com.fasterxml:aalto-xml:0.9.9" level="project" /> + <orderEntry type="library" name="Maven: nz.net.ultraq.jaxb:jaxb-utilities:1.2.6" level="project" /> + <orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-impl:2.2.7" level="project" /> + <orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-core:2.2.7" level="project" /> + <orderEntry type="library" name="Maven: javax.xml.bind:jaxb-api:2.2.7" level="project" /> + <orderEntry type="library" name="Maven: com.sun.istack:istack-commons-runtime:2.16" level="project" /> + <orderEntry type="library" name="Maven: com.sun.xml.fastinfoset:FastInfoset:1.2.12" level="project" /> + <orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0" level="project" /> + <orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" /> + <orderEntry type="library" name="Maven: com.fasterxml.jackson.datatype:jackson-datatype-json-org:2.2.1" level="project" /> + <orderEntry type="library" name="Maven: org.json:json:20090211" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" /> - <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6" level="project" /> - <orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.9" level="project" /> - <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" level="project" /> </component> <component name="POM File Configuration" pomFile="" /> </module> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java index dc2b338..dfbc273 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java @@ -1,5 +1,6 @@ package org.apache.streams.hdfs; +import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,6 +12,8 @@ public class HdfsConfigurator { private final static Logger LOGGER = LoggerFactory.getLogger(HdfsConfigurator.class); + private final static ObjectMapper mapper = new ObjectMapper(); + public static HdfsConfiguration detectConfiguration(Config hdfs) { String host = hdfs.getString("host"); Long port = hdfs.getLong("port"); @@ -29,4 +32,16 @@ public class HdfsConfigurator { return hdfsConfiguration; } + public static HdfsReaderConfiguration detectReaderConfiguration(Config hdfs) { + + HdfsConfiguration hdfsConfiguration = detectConfiguration(hdfs); + HdfsReaderConfiguration hdfsReaderConfiguration = mapper.convertValue(hdfsConfiguration, HdfsReaderConfiguration.class); + + String readerPath = hdfs.getString("readerPath"); + + hdfsReaderConfiguration.setReaderPath(readerPath); + + return hdfsReaderConfiguration; + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java index 511f684..e01d9d4 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java @@ -1,13 +1,14 @@ package org.apache.streams.hdfs; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Splitter; -import com.google.common.base.Strings; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Queues; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.streams.core.StreamsDatum; @@ -17,16 +18,16 @@ import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; import java.math.BigInteger; import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.Collection; import java.util.Queue; -import java.util.concurrent.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; /** * Created by sblackmon on 2/28/14. @@ -117,11 +118,19 @@ public class WebHdfsPersistReader implements StreamsPersistReader { connectToWebHDFS(); path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath()); try { - status = client.listStatus(path); + if( client.isFile(path)) { + FileStatus fileStatus = client.getFileStatus(path); + status = new FileStatus[1]; + status[0] = fileStatus; + } else if( client.isDirectory(path)){ + status = client.listStatus(path); + } else { + LOGGER.error("Neither file nor directory, wtf"); + } } catch (IOException e) { e.printStackTrace(); } - persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + persistQueue = new LinkedBlockingQueue<StreamsDatum>(10000); executor = Executors.newSingleThreadExecutor(); } @@ -132,7 +141,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader { @Override public StreamsResultSet readAll() { - readSourceWritePersistQueue(); + startStream(); return new StreamsResultSet(persistQueue); } @@ -167,40 +176,4 @@ public class WebHdfsPersistReader implements StreamsPersistReader { return null; } - private void readSourceWritePersistQueue() { - for( FileStatus fileStatus : status ) { - BufferedReader reader; - LOGGER.info("Found " + fileStatus.getPath().getName()); - if( persistQueue.size() > 0 ) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - } - if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) { - LOGGER.info("Processing " + fileStatus.getPath().getName()); - try { - reader = new BufferedReader(new InputStreamReader(client.open(fileStatus.getPath()))); - - String line = ""; - do{ - try { - line = reader.readLine(); - if( !Strings.isNullOrEmpty(line) ) { - String[] fields = line.split(Character.toString(DELIMITER)); - StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(fields[2])); - persistQueue.offer(entry); - } - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn(e.getMessage()); - } - } while( line != null ); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn(e.getMessage()); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java index f0bee1f..dc6ea16 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java @@ -3,15 +3,11 @@ package org.apache.streams.hdfs; import com.google.common.base.Strings; import org.apache.hadoop.fs.FileStatus; import org.apache.streams.core.StreamsDatum; -import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; -import java.io.IOException; import java.io.InputStreamReader; -import java.util.Calendar; -import java.util.Random; public class WebHdfsPersistReaderTask implements Runnable { @@ -28,30 +24,39 @@ public class WebHdfsPersistReaderTask implements Runnable { for( FileStatus fileStatus : reader.status ) { BufferedReader bufferedReader; - + LOGGER.info("Found " + fileStatus.getPath().getName()); if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) { + LOGGER.info("Processing " + fileStatus.getPath().getName()); try { bufferedReader = new BufferedReader(new InputStreamReader(reader.client.open(fileStatus.getPath()))); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.error(e.getMessage()); + return; + } - String line = ""; - do{ - try { - line = bufferedReader.readLine(); - if( !Strings.isNullOrEmpty(line) ) { - String[] fields = line.split(Character.toString(reader.DELIMITER)); - Calendar cal = Calendar.getInstance(); - cal.setTimeInMillis(new Long(fields[2])); - StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(cal.getTime())); - reader.persistQueue.offer(entry); + String line = ""; + do{ + try { + line = bufferedReader.readLine(); + if( !Strings.isNullOrEmpty(line) ) { + String[] fields = line.split(Character.toString(reader.DELIMITER)); + StreamsDatum entry = new StreamsDatum(fields[3], fields[0]); + boolean success; + do { + success = reader.persistQueue.offer(entry); + Thread.yield(); } - } catch (Exception e) { - LOGGER.warn("Failed reading " + line); + while( success == false ); + } - } while( line != null ); - } catch (IOException e) { - e.printStackTrace(); - break; - } + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn(e.getMessage()); + } + } while( !Strings.isNullOrEmpty(line) ); + LOGGER.info("Finished Processing " + fileStatus.getPath().getName()); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/pom.xml b/streams-contrib/streams-provider-google/google-gmail/pom.xml new file mode 100644 index 0000000..84e0346 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/pom.xml @@ -0,0 +1,135 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.streams</groupId> + <artifactId>streams-provider-google</artifactId> + <version>0.1.STREAMS26-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>google-gmail</artifactId> + + <repositories> + <repository> + <id>typesafe</id> + <name>typesafe</name> + <url>http://repo.typesafe.com/typesafe/repo</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>com.typesafe</groupId> + <artifactId>config</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + <dependency> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-core</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + </dependency> + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path-assert</artifactId> + </dependency> + <dependency> + <groupId>com.googlecode.gmail4j</groupId> + <artifactId>gmail4j</artifactId> + <version>0.5-SNAPSHOT</version> + </dependency> + </dependencies> + + <build> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.8</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/jsonschema2pojo</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-maven-plugin</artifactId> + <configuration> + <addCompileSourceRoot>true</addCompileSourceRoot> + <generateBuilders>true</generateBuilders> + <sourcePaths> + <sourcePath>src/main/jsonschema/com/google/gmail</sourcePath> + </sourcePaths> + <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory> + <targetPackage>org.apache.streams.google.gmail.pojo</targetPackage> + <useLongIntegers>true</useLongIntegers> + <useJodaDates>false</useJodaDates> + </configuration> + <executions> + <execution> + <goals> + <goal>generate</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java new file mode 100644 index 0000000..2d2d2aa --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java @@ -0,0 +1,27 @@ +package com.google.gmail; + +import com.google.gmail.GMailConfiguration; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigException; +import org.apache.streams.config.StreamsConfigurator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by sblackmon on 12/10/13. + */ +public class GMailConfigurator { + + private final static Logger LOGGER = LoggerFactory.getLogger(GMailConfigurator.class); + + public static GMailConfiguration detectConfiguration(Config gmail) { + + GMailConfiguration gmailConfiguration = new GMailConfiguration(); + + gmailConfiguration.setUserName(gmail.getString("username")); + gmailConfiguration.setPassword(gmail.getString("password")); + + return gmailConfiguration; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java new file mode 100644 index 0000000..068c214 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java @@ -0,0 +1,58 @@ +package com.google.gmail.provider; + +import com.googlecode.gmail4j.GmailClient; +import com.googlecode.gmail4j.GmailMessage; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.pojo.json.Activity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.List; + +/** + * Created by sblackmon on 12/10/13. + */ +public class GMailImapProviderTask implements Runnable { + + private final static Logger LOGGER = LoggerFactory.getLogger(GMailImapProviderTask.class); + + private GMailProvider provider; + + public GMailImapProviderTask(GMailProvider provider) { + this.provider = provider; + } + + @Override + public void run() { + + Calendar calendar = new GregorianCalendar(); + + calendar.set(Calendar.YEAR, 2000); + calendar.set(Calendar.MONTH, 0); + calendar.set(Calendar.DAY_OF_MONTH, 0); + calendar.set(Calendar.HOUR, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + + final List<GmailMessage> messages = this.provider.imapClient.getMessagesBy( + GmailClient.EmailSearchStrategy.DATE_GT, + calendar.getTime().toString() + ); + + for (GmailMessage message : messages) { + + Activity activity; + GMailMessageActivitySerializer serializer = new GMailMessageActivitySerializer( this.provider ); + activity = serializer.deserialize(message); + StreamsDatum entry = new StreamsDatum(activity); + this.provider.providerQueue.offer(entry); + + } + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java new file mode 100644 index 0000000..6c49bda --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java @@ -0,0 +1,194 @@ +package com.google.gmail.provider; + +import com.fasterxml.jackson.annotation.JsonBackReference; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonManagedReference; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.AnnotationIntrospector; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.googlecode.gmail4j.GmailException; +import com.googlecode.gmail4j.GmailMessage; +import com.googlecode.gmail4j.javamail.JavaMailGmailMessage; +import com.sun.mail.imap.IMAPFolder; +import com.sun.mail.imap.IMAPMessage; +import com.sun.mail.imap.IMAPSSLStore; +import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.pojo.json.*; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.mail.internet.MimeMultipart; +import java.io.IOException; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; + +/** +* Created with IntelliJ IDEA. +* User: mdelaet +* Date: 9/30/13 +* Time: 9:24 AM +* To change this template use File | Settings | File Templates. +*/ +public class GMailMessageActivitySerializer implements ActivitySerializer<GmailMessage> { + + private static final Logger LOGGER = LoggerFactory.getLogger(GMailMessageActivitySerializer.class); + + GMailProvider provider; + + ObjectMapper mapper = new ObjectMapper(); + + public GMailMessageActivitySerializer(GMailProvider provider) { + + this.provider = provider; + + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE); + + mapper.addMixInAnnotations(IMAPSSLStore.class, MessageMixIn.class); + mapper.addMixInAnnotations(IMAPFolder.class, MessageMixIn.class); + mapper.addMixInAnnotations(IMAPMessage.class, MessageMixIn.class); + mapper.addMixInAnnotations(MimeMultipart.class, MessageMixIn.class); + mapper.addMixInAnnotations(JavaMailGmailMessage.class, MessageMixIn.class); + + } + + public GMailMessageActivitySerializer() { + } + + @Override + public String serializationFormat() { + return "gmail.v1"; + } + + @Override + public GmailMessage serialize(Activity activity) { + return null; + } + + @Override + public Activity deserialize(GmailMessage gmailMessage) { + + Activity activity = new Activity(); + activity.setId(formatId(this.provider.getConfig().getUserName(), String.valueOf(gmailMessage.getMessageNumber()))); + activity.setPublished(gmailMessage.getSendDate()); + Provider provider = new Provider(); + provider.setId("http://gmail.com"); + provider.setDisplayName("GMail"); + activity.setProvider(provider); + Actor actor = new Actor(); + actor.setId(gmailMessage.getFrom().getEmail()); + actor.setDisplayName(gmailMessage.getFrom().getName()); + activity.setActor(actor); + activity.setVerb("email"); + ActivityObject object = new ActivityObject(); + try { + object.setId(gmailMessage.getTo().get(0).getEmail()); + object.setDisplayName(gmailMessage.getTo().get(0).getName()); + } catch( GmailException e ) { + LOGGER.warn(e.getMessage()); + } + activity.setTitle(gmailMessage.getSubject()); + try { + activity.setContent(gmailMessage.getContentText()); + } catch( GmailException e ) { + LOGGER.warn(e.getMessage()); + } + activity.setObject(object); + +// try { +// // if jackson can't serialize the object, find out now +// String jsonString = mapper.writeValueAsString(gmailMessage); +// ObjectNode jsonObject = mapper.valueToTree(gmailMessage); +// // since it can, write the entire source object to extensions.gmail +// Map<String, Object> extensions = Maps.newHashMap(); +// extensions.put("gmail", gmailMessage); +// activity.setAdditionalProperty("extensions", extensions); +// } catch (JsonProcessingException e) { +// LOGGER.debug("Failed Json Deserialization"); +// e.printStackTrace(); +// } + + return activity; + } + + @Override + public List<Activity> deserializeAll(List<GmailMessage> serializedList) { + throw new NotImplementedException("Not currently implemented"); + } + + public Activity convert(ObjectNode event) { + return null; + } + + public static Generator buildGenerator(ObjectNode event) { + return null; + } + + public static Icon getIcon(ObjectNode event) { + return null; + } + + public static Provider buildProvider(ObjectNode event) { + Provider provider = new Provider(); + provider.setId("id:providers:gmail"); + return provider; + } + + public static List<Object> getLinks(ObjectNode event) { + return null; + } + + public static String getUrls(ObjectNode event) { + return null; + } + + public static void addGMailExtension(Activity activity, GmailMessage gmailMessage) { + Map<String, Object> extensions = ensureExtensions(activity); + extensions.put("gmail", gmailMessage); + } + + public static String formatId(String... idparts) { + return Joiner.on(":").join(Lists.asList("id:gmail", idparts)); + } + + interface MessageMixIn { + @JsonManagedReference + @JsonIgnore + IMAPSSLStore getDefaultFolder(); // we don't need it! + @JsonManagedReference + @JsonIgnore + IMAPSSLStore getPersonalNamespaces(); // we don't need it! + @JsonManagedReference + @JsonIgnore + IMAPFolder getStore(); // we don't need it! + // @JsonManagedReference +// @JsonIgnore +// @JsonBackReference + //IMAPFolder getParent(); // we don't need it! + @JsonManagedReference + @JsonIgnore + @JsonBackReference + IMAPMessage getFolder(); // we don't need it! + @JsonManagedReference + @JsonIgnore + @JsonProperty("parent") + @JsonBackReference + MimeMultipart getParent(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java new file mode 100644 index 0000000..7ec157e --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java @@ -0,0 +1,131 @@ +package com.google.gmail.provider; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.gmail.GMailConfiguration; +import com.google.gmail.GMailConfigurator; +import com.googlecode.gmail4j.GmailClient; +import com.googlecode.gmail4j.GmailConnection; +import com.googlecode.gmail4j.http.HttpGmailConnection; +import com.googlecode.gmail4j.javamail.ImapGmailClient; +import com.googlecode.gmail4j.javamail.ImapGmailConnection; +import com.googlecode.gmail4j.rss.RssGmailClient; +import com.typesafe.config.Config; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.Queue; +import java.util.concurrent.*; + +/** + * Created by sblackmon on 12/10/13. + */ +public class GMailProvider implements StreamsProvider { + + private final static Logger LOGGER = LoggerFactory.getLogger(GMailProvider.class); + + private GMailConfiguration config; + + private Class klass; + + public GMailConfiguration getConfig() { + return config; + } + + public void setConfig(GMailConfiguration config) { + this.config = config; + } + + protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000); + + protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + + public BlockingQueue<Object> getInQueue() { + return inQueue; + } + + protected GmailClient rssClient; + protected ImapGmailClient imapClient; + + protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + public GMailProvider() { + Config config = StreamsConfigurator.config.getConfig("gmail"); + this.config = GMailConfigurator.detectConfiguration(config); + } + + public GMailProvider(GMailConfiguration config) { + this.config = config; + } + + public GMailProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("gmail"); + this.config = GMailConfigurator.detectConfiguration(config); + this.klass = klass; + } + + public GMailProvider(GMailConfiguration config, Class klass) { + this.config = config; + this.klass = klass; + } + + @Override + public void startStream() { + new Thread(new GMailImapProviderTask(this)).start(); + } + + @Override + public StreamsResultSet readCurrent() { + return null; + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public void prepare(Object configurationObject) { + + Preconditions.checkNotNull(this.klass); + + Preconditions.checkNotNull(config.getUserName()); + Preconditions.checkNotNull(config.getPassword()); + + rssClient = new RssGmailClient(); + GmailConnection rssConnection = new HttpGmailConnection(config.getUserName(), config.getPassword().toCharArray()); + rssClient.setConnection(rssConnection); + + imapClient = new ImapGmailClient(); + GmailConnection imapConnection = new ImapGmailConnection(); + imapConnection.setLoginCredentials(config.getUserName(), config.getPassword().toCharArray()); + imapClient.setConnection(imapConnection); + } + + @Override + public void cleanUp() { + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java new file mode 100644 index 0000000..73b6d77 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java @@ -0,0 +1,36 @@ +package com.google.gmail.provider; + +import com.googlecode.gmail4j.GmailMessage; +import org.apache.streams.core.StreamsDatum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Created by sblackmon on 12/10/13. + */ +public class GMailRssProviderTask implements Runnable { + + private final static Logger LOGGER = LoggerFactory.getLogger(GMailRssProviderTask.class); + + private GMailProvider provider; + + public GMailRssProviderTask(GMailProvider provider) { + this.provider = provider; + } + + @Override + public void run() { + + final List<GmailMessage> messages = this.provider.rssClient.getUnreadMessages(); + for (GmailMessage message : messages) { + + StreamsDatum entry = new StreamsDatum(message); + + this.provider.providerQueue.offer(entry); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json b/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json new file mode 100644 index 0000000..b25d5e0 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json @@ -0,0 +1,14 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "com.google.gmail.GMailConfiguration", + "properties": { + "userName": { + "type": "string" + }, + "password": { + "type": "string" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java b/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java new file mode 100644 index 0000000..e9641fc --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java @@ -0,0 +1,53 @@ +package com.google.gmail.test; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + +/** + * Created with IntelliJ IDEA. + * User: sblackmon + * Date: 8/20/13 + * Time: 5:57 PM + * To change this template use File | Settings | File Templates. + */ +@Ignore +public class GMailMessageSerDeTest { + + private final static Logger LOGGER = LoggerFactory.getLogger(GMailMessageSerDeTest.class); + + private ObjectMapper mapper = new ObjectMapper(); + + @Ignore + @Test + public void Tests() + { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + + InputStream is = GMailMessageSerDeTest.class.getResourceAsStream("/datasift_jsons.txt"); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + + try { + while (br.ready()) { + String line = br.readLine(); + LOGGER.debug(line); + + // implement + } + } catch( Exception e ) { + e.printStackTrace(); + Assert.fail(); + } + } +}
