Updated Branches: refs/heads/trunk 458a1a2f7 -> d4c1bc07b
FLUME-1981. Rpc client expiration can be done in a more thread-safe way. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d4c1bc07 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d4c1bc07 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d4c1bc07 Branch: refs/heads/trunk Commit: d4c1bc07bc33e10cfdc5eca11b085e269dbea60d Parents: 458a1a2 Author: Mike Percy <[email protected]> Authored: Wed May 8 15:56:33 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Wed May 8 15:56:33 2013 -0700 ---------------------------------------------------------------------- .../org/apache/flume/sink/AbstractRpcSink.java | 17 ++++++++++++++- 1 files changed, 16 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d4c1bc07/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java index 892c949..b3208fc 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java @@ -43,6 +43,8 @@ import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * This sink provides the basic RPC functionality for Flume. This sink takes @@ -151,6 +153,7 @@ public abstract class AbstractRpcSink extends AbstractSink private final ScheduledExecutorService cxnResetExecutor = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() .setNameFormat("Rpc Sink Reset Thread").build()); + private final Lock resetLock = new ReentrantLock(); @Override public void configure(Context context) { @@ -211,7 +214,17 @@ public abstract class AbstractRpcSink extends AbstractSink cxnResetExecutor.schedule(new Runnable() { @Override public void run() { - destroyConnection(); + resetLock.lock(); + try { + destroyConnection(); + createConnection(); + } catch (Throwable throwable) { + //Don't rethrow, else this runnable won't get scheduled again. + logger.error("Error while trying to expire connection", + throwable); + } finally { + resetLock.unlock(); + } } }, cxnResetInterval, TimeUnit.SECONDS); } @@ -319,6 +332,7 @@ public abstract class AbstractRpcSink extends AbstractSink Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); + resetLock.lock(); try { transaction.begin(); @@ -368,6 +382,7 @@ public abstract class AbstractRpcSink extends AbstractSink throw new EventDeliveryException("Failed to send events", t); } } finally { + resetLock.unlock(); transaction.close(); }
