Updated Branches: refs/heads/trunk 5d49eeb73 -> 99db32ccd
FLUME-2154. Reducing duplicate events caused by reset-connection-interval (Juhani Connolly via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/99db32cc Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/99db32cc Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/99db32cc Branch: refs/heads/trunk Commit: 99db32ccd163daf9d7685f0e8485941701e1133d Parents: 5d49eeb Author: Hari Shreedharan <[email protected]> Authored: Thu Aug 8 23:44:59 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Thu Aug 8 23:45:57 2013 -0700 ---------------------------------------------------------------------- .../org/apache/flume/sink/AbstractRpcSink.java | 37 ++++++++++++-------- .../org/apache/flume/sink/TestAvroSink.java | 6 ++++ 2 files changed, 29 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/99db32cc/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java index b3208fc..5146834 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java @@ -43,6 +43,7 @@ import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -149,11 +150,11 @@ public abstract class AbstractRpcSink extends AbstractSink private Properties clientProps; private SinkCounter sinkCounter; private int cxnResetInterval; + private AtomicBoolean resetConnectionFlag; private final int DEFAULT_CXN_RESET_INTERVAL = 0; private final ScheduledExecutorService cxnResetExecutor = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() .setNameFormat("Rpc Sink Reset Thread").build()); - private final Lock resetLock = new ReentrantLock(); @Override public void configure(Context context) { @@ -206,6 +207,7 @@ public abstract class AbstractRpcSink extends AbstractSink "port: {}", new Object[] { getName(), hostname, port }); try { + resetConnectionFlag = new AtomicBoolean(false); client = initializeRpcClient(clientProps); Preconditions.checkNotNull(client, "Rpc Client could not be " + "initialized. " + getName() + " could not be started"); @@ -214,17 +216,7 @@ public abstract class AbstractRpcSink extends AbstractSink cxnResetExecutor.schedule(new Runnable() { @Override public void run() { - resetLock.lock(); - try { - destroyConnection(); - createConnection(); - } catch (Throwable throwable) { - //Don't rethrow, else this runnable won't get scheduled again. - logger.error("Error while trying to expire connection", - throwable); - } finally { - resetLock.unlock(); - } + resetConnectionFlag.set(true); } }, cxnResetInterval, TimeUnit.SECONDS); } @@ -241,6 +233,17 @@ public abstract class AbstractRpcSink extends AbstractSink } + private void resetConnection() { + try { + destroyConnection(); + createConnection(); + } catch (Throwable throwable) { + //Don't rethrow, else this runnable won't get scheduled again. + logger.error("Error while trying to expire connection", + throwable); + } + } + private void destroyConnection() { if (client != null) { logger.debug("Rpc sink {} closing Rpc client: {}", getName(), client); @@ -332,7 +335,14 @@ public abstract class AbstractRpcSink extends AbstractSink Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); - resetLock.lock(); + if(resetConnectionFlag.get()) { + resetConnection(); + // if the time to reset is long and the timeout is short + // this may cancel the next reset request + // this should however not be an issue + resetConnectionFlag.set(false); + } + try { transaction.begin(); @@ -382,7 +392,6 @@ public abstract class AbstractRpcSink extends AbstractSink throw new EventDeliveryException("Failed to send events", t); } } finally { - resetLock.unlock(); transaction.close(); } http://git-wip-us.apache.org/repos/asf/flume/blob/99db32cc/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java index 8760c25..757a536 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java @@ -300,6 +300,12 @@ public class TestAvroSink { sink.start(); RpcClient firstClient = sink.getUnderlyingClient(); Thread.sleep(6000); + Transaction t = channel.getTransaction(); + t.begin(); + channel.put(EventBuilder.withBody("This is a test", Charset.defaultCharset())); + t.commit(); + t.close(); + sink.process(); // Make sure they are not the same object, connection should be reset Assert.assertFalse(firstClient == sink.getUnderlyingClient()); sink.stop();
