Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-26 2ec7fe8cf -> 61592dc07


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/pom.xml 
b/streams-contrib/streams-provider-google/pom.xml
new file mode 100644
index 0000000..57da330
--- /dev/null
+++ b/streams-contrib/streams-provider-google/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-contrib</artifactId>
+        <version>0.1.STREAMS26-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-provider-google</artifactId>
+
+    <packaging>pom</packaging>
+
+    <properties>
+
+    </properties>
+
+    <modules>
+        <module>google-gmail</module>
+        <module>google-gplus</module>
+    </modules>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.streams</groupId>
+                <artifactId>streams-config</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.streams</groupId>
+                <artifactId>streams-core</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.streams</groupId>
+                <artifactId>streams-pojo</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
 
b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
index 3e99827..3fc29cc 100644
--- 
a/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
+++ 
b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
@@ -4,6 +4,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.tasks.LocalStreamMonitorThread;
 import org.apache.streams.core.tasks.StreamsProviderTask;
 import org.apache.streams.core.tasks.StreamsTask;
 import org.apache.streams.util.SerializationUtil;
@@ -11,10 +12,7 @@ import org.joda.time.DateTime;
 
 import java.math.BigInteger;
 import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * {@link org.apache.streams.core.builders.StreamBuilder} implementation to 
run a data processing stream in a single
@@ -29,6 +27,7 @@ public class LocalStreamBuilder implements StreamBuilder{
     private Map<String, Object> streamConfig;
     private ExecutorService executor;
     private int totalTasks;
+    private LocalStreamMonitorThread monitorThread;
 
     /**
      *
@@ -127,7 +126,9 @@ public class LocalStreamBuilder implements StreamBuilder{
         this.executor = Executors.newFixedThreadPool(this.totalTasks);
         Map<String, StreamsProviderTask> provTasks = new HashMap<String, 
StreamsProviderTask>();
         Map<String, List<StreamsTask>> streamsTasks = new HashMap<String, 
List<StreamsTask>>();
+        monitorThread = new LocalStreamMonitorThread(this.executor, 1000);
         try {
+            this.executor.submit(monitorThread);
             for(StreamComponent comp : this.components.values()) {
                 int tasks = comp.getNumTasks();
                 List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
@@ -145,7 +146,6 @@ public class LocalStreamBuilder implements StreamBuilder{
                 this.executor.submit(task);
                 provTasks.put(prov.getId(), (StreamsProviderTask) task);
             }
-
             while(isRunning) {
                 isRunning = false;
                 for(StreamsProviderTask task : provTasks.values()) {
@@ -155,6 +155,7 @@ public class LocalStreamBuilder implements StreamBuilder{
                     Thread.sleep(3000);
                 }
             }
+            //monitorThread.shutdown();
             this.executor.shutdown();
             //complete stream shut down gracfully 
             for(StreamComponent prov : this.providers.values()) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java 
b/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
index 19ef977..e382607 100644
--- 
a/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
+++ 
b/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
@@ -53,7 +53,7 @@ public abstract class BaseStreamsTask implements StreamsTask {
     }
 
     /**
-     * NOTE NECCESSARY AT THE MOMENT.  MAY BECOME NECESSARY AS WE LOOK AT 
MAKING JOIN TASKS. CURRENTLY ALL TASK HAVE MAX
+     * NOTE NECESSARY AT THE MOMENT.  MAY BECOME NECESSARY AS WE LOOK AT 
MAKING JOIN TASKS. CURRENTLY ALL TASK HAVE MAX
      * OF 1 INPUT QUEUE.
      * Round Robins through input queues to get the next StreamsDatum. If all 
input queues are empty, it will return null.
      * @return the next StreamsDatum or null if all input queues are empty.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-core/src/main/java/org/apache/streams/core/tasks/LocalStreamMonitorThread.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/tasks/LocalStreamMonitorThread.java
 
b/streams-core/src/main/java/org/apache/streams/core/tasks/LocalStreamMonitorThread.java
new file mode 100644
index 0000000..4c73e74
--- /dev/null
+++ 
b/streams-core/src/main/java/org/apache/streams/core/tasks/LocalStreamMonitorThread.java
@@ -0,0 +1,69 @@
+package org.apache.streams.core.tasks;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryUsage;
+import java.util.concurrent.Executor;
+
+public class LocalStreamMonitorThread implements Runnable
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalStreamMonitorThread.class);
+
+    private Executor executor;
+
+    private int seconds;
+
+    private boolean run = true;
+
+    public LocalStreamMonitorThread(Executor executor, int delayInSeconds) {
+        this.executor = executor;
+        this.seconds = delayInSeconds;
+    }
+
+    public void shutdown(){
+        this.run = false;
+    }
+
+    @Override
+    public void run()
+    {
+        while(run){
+
+            /**
+             *
+             * Note:
+             * Quick class and method to let us see what is going on with the 
JVM. We need to make sure
+             * that everything is running with as little memory as possible. 
If we are generating a heap
+             * overflow, this will be very apparent by the information shown 
here.
+             */
+
+            MemoryUsage memoryUsage = 
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+
+            String maxMemory = memoryUsage.getMax() == Long.MAX_VALUE ? 
"NO_LIMIT" :
+                    humanReadableByteCount(memoryUsage.getMax(), true);
+
+            String usedMemory = humanReadableByteCount(memoryUsage.getUsed(), 
true);
+
+            LOGGER.info("[monitor] Used Memory: {}, Max: {}",
+                    usedMemory,
+                    maxMemory);
+
+            try
+            {
+                Thread.sleep(seconds*1000);
+            }
+            catch (InterruptedException e)
+            { }
+        }
+    }
+
+    public String humanReadableByteCount(long bytes, boolean si) {
+        int unit = si ? 1000 : 1024;
+        if (bytes < unit) return bytes + " B";
+        int exp = (int) (Math.log(bytes) / Math.log(unit));
+        String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp-1) + (si ? "" : 
"i");
+        return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
+    }
+}

Reply via email to