fixed datum leak
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c2a858db Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c2a858db Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c2a858db Branch: refs/heads/master Commit: c2a858db2721175453901b428440e3c45343fd9b Parents: 6adb12a Author: Steve Blackmon <[email protected]> Authored: Mon Mar 24 17:33:46 2014 -0500 Committer: Steve Blackmon <[email protected]> Committed: Mon Mar 24 17:33:46 2014 -0500 ---------------------------------------------------------------------- .../apache/streams/local/tasks/BaseStreamsTask.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a858db/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java index b9af0fd..3799480 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java @@ -76,15 +76,16 @@ public abstract class BaseStreamsTask implements StreamsTask { */ protected void addToOutgoingQueue(StreamsDatum datum) { if(this.outQueues.size() == 1) { - this.outQueues.get(0).offer(datum); + enqueue(outQueues.get(0), datum); } else { StreamsDatum newDatum = null; for(Queue<StreamsDatum> queue : this.outQueues) { try { newDatum = cloneStreamsDatum(datum); - if(newDatum != null) - queue.offer(newDatum); + if(newDatum != null) { + enqueue(queue, newDatum); + } } catch (RuntimeException e) { LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum); LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e); @@ -146,4 +147,12 @@ public abstract class BaseStreamsTask implements StreamsTask { return this.inIndex; } + private void enqueue( Queue<StreamsDatum> queue, StreamsDatum entry ) { + boolean success; + do { + success = queue.offer(entry); + Thread.yield(); + } + while( !success ); + } }
