Updated Branches: refs/heads/trunk d4c1bc07b -> cdb4766f1
FLUME-1823. LoadBalancingRpcClient method must throw exception if it is called after close is called. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/cdb4766f Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/cdb4766f Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/cdb4766f Branch: refs/heads/trunk Commit: cdb4766f1a907a8dfedc1f7a55b3b4edbe3cdaea Parents: d4c1bc0 Author: Mike Percy <[email protected]> Authored: Wed May 8 16:02:37 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Wed May 8 16:02:37 2013 -0700 ---------------------------------------------------------------------- .../apache/flume/api/LoadBalancingRpcClient.java | 19 +++++-- .../flume/api/TestLoadBalancingRpcClient.java | 41 +++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/cdb4766f/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java index f396104..e5fcc36 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java @@ -57,9 +57,11 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { private HostSelector selector; private Map<String, RpcClient> clientMap; private Properties configurationProperties; + private volatile boolean isOpen = false; @Override public void append(Event event) throws EventDeliveryException { + throwIfClosed(); boolean eventSent = false; Iterator<HostInfo> it = selector.createHostIterator(); @@ -83,6 +85,7 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { @Override public void appendBatch(List<Event> events) throws EventDeliveryException { + throwIfClosed(); boolean batchSent = false; Iterator<HostInfo> it = selector.createHostIterator(); @@ -106,13 +109,18 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { @Override public boolean isActive() { - // This client is always active and does not need to be replaced. - // Internally it will test the delegates and replace them where needed. - return true; + return isOpen; + } + + private void throwIfClosed() throws EventDeliveryException { + if (!isOpen) { + throw new EventDeliveryException("Rpc Client is closed"); + } } @Override public void close() throws FlumeException { + isOpen = false; synchronized (this) { Iterator<String> it = clientMap.keySet().iterator(); while (it.hasNext()) { @@ -177,11 +185,12 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { } selector.setHosts(hosts); + isOpen = true; } private synchronized RpcClient getClient(HostInfo info) - throws FlumeException { - + throws FlumeException, EventDeliveryException { + throwIfClosed(); String name = info.getReferenceName(); RpcClient client = clientMap.get(name); if (client == null) { http://git-wip-us.apache.org/repos/asf/flume/blob/cdb4766f/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java index 9071734..5d6828b 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java @@ -97,6 +97,47 @@ public class TestLoadBalancingRpcClient { } } + // This will fail without FLUME-1823 + @Test(expected = EventDeliveryException.class) + public void testTwoHostFailoverThrowAfterClose() throws Exception { + Server s1 = null, s2 = null; + RpcClient c = null; + try{ + LoadBalancedAvroHandler h1 = new LoadBalancedAvroHandler(); + LoadBalancedAvroHandler h2 = new LoadBalancedAvroHandler(); + + s1 = RpcTestUtils.startServer(h1); + s2 = RpcTestUtils.startServer(h2); + + Properties p = new Properties(); + p.put("hosts", "h1 h2"); + p.put("client.type", "default_loadbalance"); + p.put("hosts.h1", "127.0.0.1:" + s1.getPort()); + p.put("hosts.h2", "127.0.0.1:" + s2.getPort()); + + c = RpcClientFactory.getInstance(p); + Assert.assertTrue(c instanceof LoadBalancingRpcClient); + + for (int i = 0; i < 100; i++) { + if (i == 20) { + h2.setFailed(); + } else if (i == 40) { + h2.setOK(); + } + c.append(getEvent(i)); + } + + Assert.assertEquals(60, h1.getAppendCount()); + Assert.assertEquals(40, h2.getAppendCount()); + if (c != null) c.close(); + c.append(getEvent(3)); + Assert.fail(); + } finally { + if (s1 != null) s1.close(); + if (s2 != null) s2.close(); + } + } + /** * Ensure that we can tolerate a host that is completely down. * @throws Exception
