Repository: incubator-streams Updated Branches: refs/heads/STREAMS-26 2ec7fe8cf -> 61592dc07
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/pom.xml b/streams-contrib/streams-provider-google/pom.xml new file mode 100644 index 0000000..57da330 --- /dev/null +++ b/streams-contrib/streams-provider-google/pom.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.streams</groupId> + <artifactId>streams-contrib</artifactId> + <version>0.1.STREAMS26-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>streams-provider-google</artifactId> + + <packaging>pom</packaging> + + <properties> + + </properties> + + <modules> + <module>google-gmail</module> + <module>google-gplus</module> + </modules> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </dependencyManagement> +</project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java index 3e99827..3fc29cc 100644 --- a/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java +++ b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java @@ -4,6 +4,7 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.tasks.LocalStreamMonitorThread; import org.apache.streams.core.tasks.StreamsProviderTask; import org.apache.streams.core.tasks.StreamsTask; import org.apache.streams.util.SerializationUtil; @@ -11,10 +12,7 @@ import org.joda.time.DateTime; import java.math.BigInteger; import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * {@link org.apache.streams.core.builders.StreamBuilder} implementation to run a data processing stream in a single @@ -29,6 +27,7 @@ public class LocalStreamBuilder implements StreamBuilder{ private Map<String, Object> streamConfig; private ExecutorService executor; private int totalTasks; + private LocalStreamMonitorThread monitorThread; /** * @@ -127,7 +126,9 @@ public class LocalStreamBuilder implements StreamBuilder{ this.executor = Executors.newFixedThreadPool(this.totalTasks); Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>(); Map<String, List<StreamsTask>> streamsTasks = new HashMap<String, List<StreamsTask>>(); + monitorThread = new LocalStreamMonitorThread(this.executor, 1000); try { + this.executor.submit(monitorThread); for(StreamComponent comp : this.components.values()) { int tasks = comp.getNumTasks(); List<StreamsTask> compTasks = new LinkedList<StreamsTask>(); @@ -145,7 +146,6 @@ public class LocalStreamBuilder implements StreamBuilder{ this.executor.submit(task); provTasks.put(prov.getId(), (StreamsProviderTask) task); } - while(isRunning) { isRunning = false; for(StreamsProviderTask task : provTasks.values()) { @@ -155,6 +155,7 @@ public class LocalStreamBuilder implements StreamBuilder{ Thread.sleep(3000); } } + //monitorThread.shutdown(); this.executor.shutdown(); //complete stream shut down gracfully for(StreamComponent prov : this.providers.values()) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java index 19ef977..e382607 100644 --- a/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java +++ b/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java @@ -53,7 +53,7 @@ public abstract class BaseStreamsTask implements StreamsTask { } /** - * NOTE NECCESSARY AT THE MOMENT. MAY BECOME NECESSARY AS WE LOOK AT MAKING JOIN TASKS. CURRENTLY ALL TASK HAVE MAX + * NOTE NECESSARY AT THE MOMENT. MAY BECOME NECESSARY AS WE LOOK AT MAKING JOIN TASKS. CURRENTLY ALL TASK HAVE MAX * OF 1 INPUT QUEUE. * Round Robins through input queues to get the next StreamsDatum. If all input queues are empty, it will return null. * @return the next StreamsDatum or null if all input queues are empty. http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-core/src/main/java/org/apache/streams/core/tasks/LocalStreamMonitorThread.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/LocalStreamMonitorThread.java b/streams-core/src/main/java/org/apache/streams/core/tasks/LocalStreamMonitorThread.java new file mode 100644 index 0000000..4c73e74 --- /dev/null +++ b/streams-core/src/main/java/org/apache/streams/core/tasks/LocalStreamMonitorThread.java @@ -0,0 +1,69 @@ +package org.apache.streams.core.tasks; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryUsage; +import java.util.concurrent.Executor; + +public class LocalStreamMonitorThread implements Runnable +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LocalStreamMonitorThread.class); + + private Executor executor; + + private int seconds; + + private boolean run = true; + + public LocalStreamMonitorThread(Executor executor, int delayInSeconds) { + this.executor = executor; + this.seconds = delayInSeconds; + } + + public void shutdown(){ + this.run = false; + } + + @Override + public void run() + { + while(run){ + + /** + * + * Note: + * Quick class and method to let us see what is going on with the JVM. We need to make sure + * that everything is running with as little memory as possible. If we are generating a heap + * overflow, this will be very apparent by the information shown here. + */ + + MemoryUsage memoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + + String maxMemory = memoryUsage.getMax() == Long.MAX_VALUE ? "NO_LIMIT" : + humanReadableByteCount(memoryUsage.getMax(), true); + + String usedMemory = humanReadableByteCount(memoryUsage.getUsed(), true); + + LOGGER.info("[monitor] Used Memory: {}, Max: {}", + usedMemory, + maxMemory); + + try + { + Thread.sleep(seconds*1000); + } + catch (InterruptedException e) + { } + } + } + + public String humanReadableByteCount(long bytes, boolean si) { + int unit = si ? 1000 : 1024; + if (bytes < unit) return bytes + " B"; + int exp = (int) (Math.log(bytes) / Math.log(unit)); + String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp-1) + (si ? "" : "i"); + return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre); + } +}
