Fixed memory leak in ES reader pipelines
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f877c5fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f877c5fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f877c5fa Branch: refs/heads/master Commit: f877c5fa46918f09a429139dbe5d2dfbf5eb4ea3 Parents: 79ac9aa Author: sblackmon <[email protected]> Authored: Fri Apr 11 07:17:13 2014 -0600 Committer: sblackmon <[email protected]> Committed: Fri Apr 11 07:17:13 2014 -0600 ---------------------------------------------------------------------- pom.xml | 2 +- streams-config/pom.xml | 2 +- streams-contrib/pom.xml | 2 +- .../streams-persist-cassandra/pom.xml | 2 +- streams-contrib/streams-persist-console/pom.xml | 2 +- .../streams-persist-elasticsearch/pom.xml | 2 +- .../ElasticsearchPersistReader.java | 22 ++++++++++---------- .../ElasticsearchPersistReaderTask.java | 14 ++++++++++++- streams-contrib/streams-persist-hbase/pom.xml | 2 +- streams-contrib/streams-persist-hdfs/pom.xml | 2 +- streams-contrib/streams-persist-kafka/pom.xml | 2 +- streams-contrib/streams-persist-mongo/pom.xml | 2 +- streams-contrib/streams-processor-urls/pom.xml | 4 ++-- .../streams-provider-datasift/pom.xml | 2 +- .../streams-provider-facebook/pom.xml | 2 +- .../gnip-edc-facebook/pom.xml | 2 +- .../gnip-edc-flickr/pom.xml | 2 +- .../gnip-edc-googleplus/pom.xml | 2 +- .../gnip-edc-instagram/pom.xml | 2 +- .../gnip-edc-reddit/pom.xml | 2 +- .../gnip-edc-youtube/pom.xml | 2 +- .../gnip-powertrack/pom.xml | 2 +- streams-contrib/streams-provider-gnip/pom.xml | 2 +- .../google-gmail/pom.xml | 2 +- .../google-gplus/pom.xml | 2 +- streams-contrib/streams-provider-google/pom.xml | 2 +- .../streams-provider-moreover/pom.xml | 2 +- streams-contrib/streams-provider-rss/pom.xml | 2 +- .../streams-provider-sysomos/pom.xml | 2 +- .../streams-provider-twitter/pom.xml | 2 +- streams-core/pom.xml | 2 +- .../activity-consumer/pom.xml | 2 +- .../activity-registration/pom.xml | 2 +- .../activity-subscriber/pom.xml | 2 +- streams-osgi-components/pom.xml | 2 +- .../streams-components-all/pom.xml | 2 +- streams-pojo/pom.xml | 2 +- streams-runtimes/pom.xml | 2 +- streams-runtimes/streams-runtime-local/pom.xml | 2 +- .../streams/local/tasks/BaseStreamsTask.java | 14 +++---------- streams-runtimes/streams-runtime-pig/pom.xml | 2 +- streams-runtimes/streams-runtime-storm/pom.xml | 2 +- streams-runtimes/streams-runtime-webapp/pom.xml | 2 +- streams-util/pom.xml | 2 +- streams-web/pom.xml | 2 +- 45 files changed, 70 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 069a100..78f5d6f 100644 --- a/pom.xml +++ b/pom.xml @@ -29,7 +29,7 @@ <groupId>org.apache.streams</groupId> <artifactId>streams-project</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> <name>Apache Streams Project</name> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-config/pom.xml ---------------------------------------------------------------------- diff --git a/streams-config/pom.xml b/streams-config/pom.xml index 92dd7c8..f31301c 100644 --- a/streams-config/pom.xml +++ b/streams-config/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-project</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <artifactId>streams-config</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index 727fa28..d80fc63 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -23,7 +23,7 @@ <parent> <artifactId>streams-project</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-cassandra/pom.xml b/streams-contrib/streams-persist-cassandra/pom.xml index 33967b0..fd6711f 100644 --- a/streams-contrib/streams-persist-cassandra/pom.xml +++ b/streams-contrib/streams-persist-cassandra/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-contrib</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <artifactId>streams-persist-cassandra</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-console/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-console/pom.xml b/streams-contrib/streams-persist-console/pom.xml index d62da1c..c7f2cd3 100644 --- a/streams-contrib/streams-persist-console/pom.xml +++ b/streams-contrib/streams-persist-console/pom.xml @@ -5,7 +5,7 @@ <parent> <artifactId>streams-contrib</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml index 6af675c..07433ba 100644 --- a/streams-contrib/streams-persist-elasticsearch/pom.xml +++ b/streams-contrib/streams-persist-elasticsearch/pom.xml @@ -5,7 +5,7 @@ <parent> <artifactId>streams-contrib</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java index 4fde58d..ea74bf6 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java @@ -11,9 +11,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistReader; -import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.core.*; import org.apache.streams.jackson.StreamsJacksonMapper; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; @@ -121,7 +119,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl public void prepare(Object o) { mapper = StreamsJacksonMapper.getInstance(); - persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000)); // If we haven't already set up the search, then set up the search. if(search == null) @@ -290,14 +288,16 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl @Override public StreamsResultSet readCurrent() { - LOGGER.debug("readCurrent: {}", persistQueue.size()); + StreamsResultSet current; - Collection<StreamsDatum> currentIterator = Lists.newArrayList(); - Iterators.addAll(currentIterator, persistQueue.iterator()); - - StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator)); - - persistQueue.clear(); + synchronized( ElasticsearchPersistReader.class ) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + current.setCounter(new DatumStatusCounter()); +// current.getCounter().add(countersCurrent); +// countersTotal.add(countersCurrent); +// countersCurrent = new DatumStatusCounter(); + persistQueue.clear(); + } return current; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java index 505dc01..7750fac 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java @@ -4,11 +4,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.streams.core.StreamsDatum; import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.ComponentUtils; import org.elasticsearch.search.SearchHit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Queue; import java.util.Random; public class ElasticsearchPersistReaderTask implements Runnable { @@ -40,7 +42,7 @@ public class ElasticsearchPersistReaderTask implements Runnable { item.getMetadata().put("id", hit.getId()); item.getMetadata().put("index", hit.getIndex()); item.getMetadata().put("type", hit.getType()); - reader.persistQueue.offer(item); + write(item); } try { Thread.sleep(new Random().nextInt(100)); @@ -48,4 +50,14 @@ public class ElasticsearchPersistReaderTask implements Runnable { } + private void write( StreamsDatum entry ) { + boolean success; + do { + synchronized( ElasticsearchPersistReader.class ) { + success = reader.persistQueue.offer(entry); + } + Thread.yield(); + } + while( !success ); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hbase/pom.xml b/streams-contrib/streams-persist-hbase/pom.xml index 04f8c39..18ec32e 100644 --- a/streams-contrib/streams-persist-hbase/pom.xml +++ b/streams-contrib/streams-persist-hbase/pom.xml @@ -5,7 +5,7 @@ <parent> <artifactId>streams-contrib</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/pom.xml b/streams-contrib/streams-persist-hdfs/pom.xml index a111f1a..5fe33b3 100644 --- a/streams-contrib/streams-persist-hdfs/pom.xml +++ b/streams-contrib/streams-persist-hdfs/pom.xml @@ -5,7 +5,7 @@ <parent> <artifactId>streams-contrib</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/pom.xml b/streams-contrib/streams-persist-kafka/pom.xml index 6ee84bb..84ddee8 100644 --- a/streams-contrib/streams-persist-kafka/pom.xml +++ b/streams-contrib/streams-persist-kafka/pom.xml @@ -5,7 +5,7 @@ <parent> <artifactId>streams-contrib</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-persist-mongo/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/pom.xml b/streams-contrib/streams-persist-mongo/pom.xml index ae0f91d..fbe1723 100644 --- a/streams-contrib/streams-persist-mongo/pom.xml +++ b/streams-contrib/streams-persist-mongo/pom.xml @@ -5,7 +5,7 @@ <parent> <artifactId>streams-contrib</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-processor-urls/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-urls/pom.xml b/streams-contrib/streams-processor-urls/pom.xml index 3e820ce..b320ca5 100644 --- a/streams-contrib/streams-processor-urls/pom.xml +++ b/streams-contrib/streams-processor-urls/pom.xml @@ -5,12 +5,12 @@ <modelVersion>4.0.0</modelVersion> <artifactId>streams-processor-urls</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-contrib</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <dependencies> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-datasift/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/pom.xml b/streams-contrib/streams-provider-datasift/pom.xml index 0f249b5..6d59514 100644 --- a/streams-contrib/streams-provider-datasift/pom.xml +++ b/streams-contrib/streams-provider-datasift/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-contrib</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-facebook/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/pom.xml b/streams-contrib/streams-provider-facebook/pom.xml index 6036690..0d54255 100644 --- a/streams-contrib/streams-provider-facebook/pom.xml +++ b/streams-contrib/streams-provider-facebook/pom.xml @@ -5,7 +5,7 @@ <parent> <artifactId>streams-contrib</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>streams-provider-facebook</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-edc-facebook/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-gnip/gnip-edc-facebook/pom.xml b/streams-contrib/streams-provider-gnip/gnip-edc-facebook/pom.xml index 8c0cab2..e0603bd 100644 --- a/streams-contrib/streams-provider-gnip/gnip-edc-facebook/pom.xml +++ b/streams-contrib/streams-provider-gnip/gnip-edc-facebook/pom.xml @@ -5,7 +5,7 @@ <parent> <artifactId>streams-provider-gnip</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-edc-flickr/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-gnip/gnip-edc-flickr/pom.xml b/streams-contrib/streams-provider-gnip/gnip-edc-flickr/pom.xml index 25b77f7..e96b90c 100644 --- a/streams-contrib/streams-provider-gnip/gnip-edc-flickr/pom.xml +++ b/streams-contrib/streams-provider-gnip/gnip-edc-flickr/pom.xml @@ -5,7 +5,7 @@ <parent> <artifactId>streams-provider-gnip</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/pom.xml b/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/pom.xml index 1d760e7..03f74c1 100644 --- a/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/pom.xml +++ b/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-provider-gnip</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-edc-instagram/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-gnip/gnip-edc-instagram/pom.xml b/streams-contrib/streams-provider-gnip/gnip-edc-instagram/pom.xml index acdb119..eced509 100644 --- a/streams-contrib/streams-provider-gnip/gnip-edc-instagram/pom.xml +++ b/streams-contrib/streams-provider-gnip/gnip-edc-instagram/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-provider-gnip</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-edc-reddit/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-gnip/gnip-edc-reddit/pom.xml b/streams-contrib/streams-provider-gnip/gnip-edc-reddit/pom.xml index 8eb1b92..dea0089 100644 --- a/streams-contrib/streams-provider-gnip/gnip-edc-reddit/pom.xml +++ b/streams-contrib/streams-provider-gnip/gnip-edc-reddit/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-provider-gnip</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-edc-youtube/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-gnip/gnip-edc-youtube/pom.xml b/streams-contrib/streams-provider-gnip/gnip-edc-youtube/pom.xml index 14b5a31..b9e6e1e 100644 --- a/streams-contrib/streams-provider-gnip/gnip-edc-youtube/pom.xml +++ b/streams-contrib/streams-provider-gnip/gnip-edc-youtube/pom.xml @@ -5,7 +5,7 @@ <parent> <artifactId>streams-provider-gnip</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/gnip-powertrack/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-gnip/gnip-powertrack/pom.xml b/streams-contrib/streams-provider-gnip/gnip-powertrack/pom.xml index ad17ae7..cbe47f5 100644 --- a/streams-contrib/streams-provider-gnip/gnip-powertrack/pom.xml +++ b/streams-contrib/streams-provider-gnip/gnip-powertrack/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-provider-gnip</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-gnip/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-gnip/pom.xml b/streams-contrib/streams-provider-gnip/pom.xml index fd80329..68d6591 100644 --- a/streams-contrib/streams-provider-gnip/pom.xml +++ b/streams-contrib/streams-provider-gnip/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-contrib</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-google/google-gmail/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/pom.xml b/streams-contrib/streams-provider-google/google-gmail/pom.xml index 2d72ce0..4240572 100644 --- a/streams-contrib/streams-provider-google/google-gmail/pom.xml +++ b/streams-contrib/streams-provider-google/google-gmail/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-provider-google</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-google/google-gplus/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/pom.xml b/streams-contrib/streams-provider-google/google-gplus/pom.xml index ac10497..facb6ef 100644 --- a/streams-contrib/streams-provider-google/google-gplus/pom.xml +++ b/streams-contrib/streams-provider-google/google-gplus/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-provider-google</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-google/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/pom.xml b/streams-contrib/streams-provider-google/pom.xml index 00feb60..b720b76 100644 --- a/streams-contrib/streams-provider-google/pom.xml +++ b/streams-contrib/streams-provider-google/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-contrib</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-moreover/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/pom.xml b/streams-contrib/streams-provider-moreover/pom.xml index 178d383..b3dcf8e 100644 --- a/streams-contrib/streams-provider-moreover/pom.xml +++ b/streams-contrib/streams-provider-moreover/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-contrib</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-rss/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/pom.xml b/streams-contrib/streams-provider-rss/pom.xml index ac258a9..c9f24d5 100644 --- a/streams-contrib/streams-provider-rss/pom.xml +++ b/streams-contrib/streams-provider-rss/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-contrib</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-sysomos/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/pom.xml b/streams-contrib/streams-provider-sysomos/pom.xml index 82534d7..9880457 100644 --- a/streams-contrib/streams-provider-sysomos/pom.xml +++ b/streams-contrib/streams-provider-sysomos/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-contrib</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-contrib/streams-provider-twitter/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml index 8153270..3c27b8c 100644 --- a/streams-contrib/streams-provider-twitter/pom.xml +++ b/streams-contrib/streams-provider-twitter/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-contrib</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-core/pom.xml ---------------------------------------------------------------------- diff --git a/streams-core/pom.xml b/streams-core/pom.xml index 4c81008..9546b5f 100644 --- a/streams-core/pom.xml +++ b/streams-core/pom.xml @@ -23,7 +23,7 @@ <parent> <artifactId>streams-project</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>streams-core</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-osgi-components/activity-consumer/pom.xml ---------------------------------------------------------------------- diff --git a/streams-osgi-components/activity-consumer/pom.xml b/streams-osgi-components/activity-consumer/pom.xml index 6f6aa06..f4964d5 100644 --- a/streams-osgi-components/activity-consumer/pom.xml +++ b/streams-osgi-components/activity-consumer/pom.xml @@ -26,7 +26,7 @@ <parent> <groupId>org.apache.streams.osgi.components</groupId> <artifactId>streams-osgi-components</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <artifactId>activity-consumer</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-osgi-components/activity-registration/pom.xml ---------------------------------------------------------------------- diff --git a/streams-osgi-components/activity-registration/pom.xml b/streams-osgi-components/activity-registration/pom.xml index 46faabf..9ab8a74 100644 --- a/streams-osgi-components/activity-registration/pom.xml +++ b/streams-osgi-components/activity-registration/pom.xml @@ -26,7 +26,7 @@ <parent> <groupId>org.apache.streams.osgi.components</groupId> <artifactId>streams-osgi-components</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <artifactId>activity-registration</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-osgi-components/activity-subscriber/pom.xml ---------------------------------------------------------------------- diff --git a/streams-osgi-components/activity-subscriber/pom.xml b/streams-osgi-components/activity-subscriber/pom.xml index 4f8ee1a..367fee7 100644 --- a/streams-osgi-components/activity-subscriber/pom.xml +++ b/streams-osgi-components/activity-subscriber/pom.xml @@ -26,7 +26,7 @@ <parent> <groupId>org.apache.streams.osgi.components</groupId> <artifactId>streams-osgi-components</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <artifactId>activity-subscriber</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-osgi-components/pom.xml ---------------------------------------------------------------------- diff --git a/streams-osgi-components/pom.xml b/streams-osgi-components/pom.xml index 078586b..d7f68cd 100644 --- a/streams-osgi-components/pom.xml +++ b/streams-osgi-components/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-project</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <groupId>org.apache.streams.osgi.components</groupId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-osgi-components/streams-components-all/pom.xml ---------------------------------------------------------------------- diff --git a/streams-osgi-components/streams-components-all/pom.xml b/streams-osgi-components/streams-components-all/pom.xml index 2389cdf..448f1a4 100644 --- a/streams-osgi-components/streams-components-all/pom.xml +++ b/streams-osgi-components/streams-components-all/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.streams.osgi.components</groupId> <artifactId>streams-osgi-components</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <artifactId>streams-components-all</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-pojo/pom.xml ---------------------------------------------------------------------- diff --git a/streams-pojo/pom.xml b/streams-pojo/pom.xml index 8a9f5c0..a3a12f6 100644 --- a/streams-pojo/pom.xml +++ b/streams-pojo/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-project</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-runtimes/pom.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/pom.xml b/streams-runtimes/pom.xml index 47418f1..5d43c28 100644 --- a/streams-runtimes/pom.xml +++ b/streams-runtimes/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-project</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <artifactId>streams-runtimes</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-runtimes/streams-runtime-local/pom.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml index 2e9d6e3..b7ddb9a 100644 --- a/streams-runtimes/streams-runtime-local/pom.xml +++ b/streams-runtimes/streams-runtime-local/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-runtimes</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <artifactId>streams-runtime-local</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java index 8006560..a7f988e 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.streams.core.StreamsDatum; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; +import org.apache.streams.util.ComponentUtils; import org.apache.streams.util.SerializationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +75,7 @@ public abstract class BaseStreamsTask implements StreamsTask { */ protected void addToOutgoingQueue(StreamsDatum datum) { if(this.outQueues.size() == 1) { - enqueue(outQueues.get(0), datum); + ComponentUtils.offerUntilSuccess(datum, outQueues.get(0)); } else { StreamsDatum newDatum = null; @@ -82,7 +83,7 @@ public abstract class BaseStreamsTask implements StreamsTask { try { newDatum = cloneStreamsDatum(datum); if(newDatum != null) { - enqueue(queue, newDatum); + ComponentUtils.offerUntilSuccess(newDatum, queue); } } catch (RuntimeException e) { LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum); @@ -145,15 +146,6 @@ public abstract class BaseStreamsTask implements StreamsTask { return this.inIndex; } - private void enqueue( Queue<StreamsDatum> queue, StreamsDatum entry ) { - boolean success; - do { - success = queue.offer(entry); - Thread.yield(); - } - while( !success ); - } - private StreamsDatum copyMetaData(StreamsDatum copyFrom, StreamsDatum copyTo) { Map<String, Object> fromMeta = copyFrom.getMetadata(); Map<String, Object> toMeta = copyTo.getMetadata(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-runtimes/streams-runtime-pig/pom.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/pom.xml b/streams-runtimes/streams-runtime-pig/pom.xml index 6d91a2f..e62a7c8 100644 --- a/streams-runtimes/streams-runtime-pig/pom.xml +++ b/streams-runtimes/streams-runtime-pig/pom.xml @@ -23,7 +23,7 @@ <parent> <artifactId>streams-runtimes</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>streams-runtime-pig</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-runtimes/streams-runtime-storm/pom.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-storm/pom.xml b/streams-runtimes/streams-runtime-storm/pom.xml index 921d346..edaa760 100644 --- a/streams-runtimes/streams-runtime-storm/pom.xml +++ b/streams-runtimes/streams-runtime-storm/pom.xml @@ -23,7 +23,7 @@ <parent> <artifactId>streams-runtimes</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>streams-runtime-storm</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-runtimes/streams-runtime-webapp/pom.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/pom.xml b/streams-runtimes/streams-runtime-webapp/pom.xml index c1d21b7..23b00f3 100644 --- a/streams-runtimes/streams-runtime-webapp/pom.xml +++ b/streams-runtimes/streams-runtime-webapp/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.streams</groupId> <artifactId>streams-runtimes</artifactId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <artifactId>streams-runtime-webapp</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-util/pom.xml ---------------------------------------------------------------------- diff --git a/streams-util/pom.xml b/streams-util/pom.xml index 5249021..0a48ec9 100644 --- a/streams-util/pom.xml +++ b/streams-util/pom.xml @@ -23,7 +23,7 @@ <parent> <artifactId>streams-project</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f877c5fa/streams-web/pom.xml ---------------------------------------------------------------------- diff --git a/streams-web/pom.xml b/streams-web/pom.xml index 612fe72..89ca1b5 100644 --- a/streams-web/pom.xml +++ b/streams-web/pom.xml @@ -22,7 +22,7 @@ <parent> <artifactId>streams-project</artifactId> <groupId>org.apache.streams</groupId> - <version>0.1-SPRINGCLEANING</version> + <version>0.1-SNAPSHOT</version> </parent> <artifactId>streams-web</artifactId> <packaging>war</packaging>
