Updated Branches: refs/heads/flume-1.5 c6117b50d -> d1b85bd8e
FLUME-2275. Improve scalability of MorphlineInterceptor under contention (Wolfgang Hoschek via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d1b85bd8 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d1b85bd8 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d1b85bd8 Branch: refs/heads/flume-1.5 Commit: d1b85bd8ed27e376c762d6514a9b874e5ac8810f Parents: c6117b5 Author: Hari Shreedharan <hshreedha...@apache.org> Authored: Thu Jan 2 18:22:31 2014 -0800 Committer: Hari Shreedharan <hshreedha...@apache.org> Committed: Thu Jan 2 18:23:56 2014 -0800 ---------------------------------------------------------------------- .../solr/morphline/MorphlineInterceptor.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d1b85bd8/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java index 8e5e4b3..ef8f716 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java @@ -23,18 +23,18 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.event.EventBuilder; import org.apache.flume.interceptor.Interceptor; - import org.kitesdk.morphline.api.Command; import org.kitesdk.morphline.api.Record; import org.kitesdk.morphline.base.Fields; + import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; @@ -47,7 +47,7 @@ import com.google.common.io.ByteStreams; public class MorphlineInterceptor implements Interceptor { private final Context context; - private final BlockingQueue<LocalMorphlineInterceptor> pool = new LinkedBlockingQueue(); + private final Queue<LocalMorphlineInterceptor> pool = new ConcurrentLinkedQueue<LocalMorphlineInterceptor>(); protected MorphlineInterceptor(Context context) { Preconditions.checkNotNull(context); @@ -61,9 +61,8 @@ public class MorphlineInterceptor implements Interceptor { @Override public void close() { - List<LocalMorphlineInterceptor> interceptors = new ArrayList(); - pool.drainTo(interceptors); - for (LocalMorphlineInterceptor interceptor : interceptors) { + LocalMorphlineInterceptor interceptor; + while ((interceptor = pool.poll()) != null) { interceptor.close(); } } @@ -85,11 +84,7 @@ public class MorphlineInterceptor implements Interceptor { } private void returnToPool(LocalMorphlineInterceptor interceptor) { - try { - pool.put(interceptor); - } catch (InterruptedException e) { - throw new FlumeException(e); - } + pool.add(interceptor); } private LocalMorphlineInterceptor borrowFromPool() {