Updated Branches: refs/heads/flume-1.3.0 939bf1074 -> 752f70f4f
FLUME-1541. Implement failover for LoadBalancingSinkProcessor. (Juhani Connolly via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/752f70f4 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/752f70f4 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/752f70f4 Branch: refs/heads/flume-1.3.0 Commit: 752f70f4fd053fa53650c0d7673a99c7610464f9 Parents: 939bf10 Author: Mike Percy <[email protected]> Authored: Thu Sep 6 01:48:53 2012 -0700 Committer: Mike Percy <[email protected]> Committed: Thu Sep 6 01:51:41 2012 -0700 ---------------------------------------------------------------------- .../apache/flume/sink/AbstractSinkSelector.java | 5 + .../flume/sink/LoadBalancingSinkProcessor.java | 140 +++++++++- .../flume/sink/TestLoadBalancingSinkProcessor.java | 219 ++++++++++++++- flume-ng-doc/sphinx/FlumeUserGuide.rst | 40 ++- 4 files changed, 378 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/752f70f4/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java index 63397a5..3e806a7 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java @@ -63,4 +63,9 @@ public abstract class AbstractSinkSelector implements SinkSelector { protected List<Sink> getSinks() { return sinkList; } + + @Override + public void informSinkFailed(Sink failedSink) { + // no-op + } } http://git-wip-us.apache.org/repos/asf/flume/blob/752f70f4/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java index 18d4509..93a46a0 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java @@ -19,8 +19,10 @@ package org.apache.flume.sink; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Random; import org.apache.flume.Context; @@ -78,13 +80,13 @@ import com.google.common.base.Preconditions; * @see LoadBalancingSinkProcessor.SinkSelector */ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor { - public static final String CONFIG_SELECTOR = "selector"; public static final String CONFIG_SELECTOR_PREFIX = CONFIG_SELECTOR + "."; public static final String SELECTOR_NAME_ROUND_ROBIN = "ROUND_ROBIN"; public static final String SELECTOR_NAME_RANDOM = "RANDOM"; - + public static final String SELECTOR_NAME_ROUND_ROBIN_BACKOFF = "ROUND_ROBIN_BACKOFF"; + public static final String SELECTOR_NAME_RANDOM_BACKOFF = "RANDOM_BACKOFF"; private static final Logger LOGGER = LoggerFactory .getLogger(LoadBalancingSinkProcessor.class); @@ -106,6 +108,10 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor { selector = new RoundRobinSinkSelector(); } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) { selector = new RandomOrderSinkSelector(); + } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN_BACKOFF)) { + selector = new BackoffRoundRobinSinkSelector(); + } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM_BACKOFF)) { + selector = new BackoffRandomOrderSinkSelector(); } else { try { @SuppressWarnings("unchecked") @@ -151,6 +157,7 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor { status = sink.process(); break; } catch (Exception ex) { + selector.informSinkFailed(sink); LOGGER.warn("Sink failed to consume event. " + "Attempting next sink if available.", ex); } @@ -191,6 +198,8 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor { void setSinks(List<Sink> sinks); Iterator<Sink> createSinkIterator(); + + void informSinkFailed(Sink failedSink); } /** @@ -248,4 +257,131 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor { return new SpecificOrderIterator<Sink>(indexOrder, getSinks()); } } + + private static class FailureState { + long lastFail; + long restoreTime; + int sequentialFails; + } + + public static abstract class AbstractBackoffSinkSelector extends AbstractSinkSelector { + // 2 ^ 16 seconds should be more than enough for an upper limit... + private static final int EXP_BACKOFF_COUNTER_LIMIT = 16; + private static final String CONF_MAX_TIMEOUT = "maxBackoffMillis"; + private static final long CONSIDER_SEQUENTIAL_RANGE = 2000l; + private static final long MAX_TIMEOUT = 30000l; + + protected List<FailureState> sinkStates; + protected Map<Sink, FailureState> stateMap; + protected long maxTimeout = MAX_TIMEOUT; + + @Override + public void configure(Context context) { + super.configure(context); + maxTimeout = context.getLong(CONF_MAX_TIMEOUT, MAX_TIMEOUT); + } + + @Override + public void setSinks(List<Sink> sinks) { + super.setSinks(sinks); + sinkStates = new ArrayList<FailureState>(); + stateMap = new HashMap<Sink, FailureState>(); + for(Sink sink : sinks) { + FailureState state = new FailureState(); + sinkStates.add(state); + stateMap.put(sink, state); + } + } + + @Override + public void informSinkFailed(Sink failedSink) { + super.informSinkFailed(failedSink); + FailureState state = stateMap.get(failedSink); + long now = System.currentTimeMillis(); + long delta = now - state.lastFail; + + long lastBackoffLength = Math.min(MAX_TIMEOUT, 1000 * (1 << state.sequentialFails)); + long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE; + if( allowableDiff > delta ) { + if(state.sequentialFails < EXP_BACKOFF_COUNTER_LIMIT) + state.sequentialFails++; + } else { + state.sequentialFails = 1; + } + state.lastFail = now; + state.restoreTime = now + Math.min(MAX_TIMEOUT, 1000 * (1 << state.sequentialFails)); + } + + } + + + private static class BackoffRoundRobinSinkSelector extends AbstractBackoffSinkSelector { + private int nextHead = 0; + + @Override + public Iterator<Sink> createSinkIterator() { + long curTime = System.currentTimeMillis(); + List<Integer> activeIndices = new ArrayList<Integer>(); + int index = 0; + for(FailureState state : sinkStates) { + if (state.restoreTime < curTime) { + activeIndices.add(index); + } + index++; + } + + int size = activeIndices.size(); + // possible that the size has shrunk so gotta adjust nextHead for that + if(nextHead >= size) { + nextHead = 0; + } + int begin = nextHead++; + if (nextHead == activeIndices.size()) { + nextHead = 0; + } + + int[] indexOrder = new int[size]; + + for (int i=0; i < size; i++) { + indexOrder[i] = activeIndices.get((begin + i) % size); + } + + return new SpecificOrderIterator<Sink>(indexOrder, getSinks()); + } + } + + /** + * A sink selector that implements a random sink selection policy. This + * implementation is not thread safe. + */ + private static class BackoffRandomOrderSinkSelector extends AbstractBackoffSinkSelector { + private Random random = new Random(System.currentTimeMillis()); + + @Override + public Iterator<Sink> createSinkIterator() { + long now = System.currentTimeMillis(); + + List<Integer> indexList = new ArrayList<Integer>(); + + int i = 0; + for (FailureState state : sinkStates) { + if(state.restoreTime < now) + indexList.add(i); + i++; + } + + int size = indexList.size(); + int[] indexOrder = new int[size]; + + while (indexList.size() != 1) { + int pick = random.nextInt(indexList.size()); + indexOrder[indexList.size() - 1] = indexList.remove(pick); + } + + indexOrder[0] = indexList.get(0); + + return new SpecificOrderIterator<Sink>(indexOrder, getSinks()); + } + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/752f70f4/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java index 1e9c94e..981f88e 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java @@ -113,7 +113,7 @@ public class TestLoadBalancingSinkProcessor { s1.setChannel(ch); // s1 always fails - s1.setFail(); + s1.setFail(true); MockSink s2 = new MockSink(2); s2.setChannel(ch); @@ -123,7 +123,7 @@ public class TestLoadBalancingSinkProcessor { s3.setChannel(ch); // s3 always fails - s3.setFail(); + s3.setFail(true); List<Sink> sinks = new ArrayList<Sink>(); sinks.add(s1); @@ -143,6 +143,65 @@ public class TestLoadBalancingSinkProcessor { } @Test + public void testRandomBackoff() throws Exception { + Channel ch = new MockChannel(); + int n = 100; + int numEvents = n; + for (int i = 0; i < numEvents; i++) { + ch.put(new MockEvent("test" + i)); + } + + MockSink s1 = new MockSink(1); + s1.setChannel(ch); + + // s1 always fails + s1.setFail(true); + + MockSink s2 = new MockSink(2); + s2.setChannel(ch); + + MockSink s3 = new MockSink(3); + s3.setChannel(ch); + + // s3 always fails + s3.setFail(true); + + List<Sink> sinks = new ArrayList<Sink>(); + sinks.add(s1); + sinks.add(s2); + sinks.add(s3); + + LoadBalancingSinkProcessor lbsp = getProcessor("random_backoff", sinks); + + // TODO: there is a remote possibility that s0 or s2 + // never get hit by the random assignment + // and thus not backoffed, causing the test to fail + for(int i=0; i < 50; i++) { + // a well behaved runner would always check the return. + lbsp.process(); + } + Assert.assertEquals(50, s2.getEvents().size()); + s2.setFail(true); + s1.setFail(false); // s1 should still be backed off + try { + lbsp.process(); + // nothing should be able to process right now + Assert.fail("Expected EventDeliveryException"); + } catch (EventDeliveryException e) { + // this is expected + } + Thread.sleep(2100); // wait for s1 to no longer be backed off + Sink.Status s = Sink.Status.READY; + while (s != Sink.Status.BACKOFF) { + s = lbsp.process(); + } + + Assert.assertEquals(50, s1.getEvents().size()); + Assert.assertEquals(50, s2.getEvents().size()); + Assert.assertEquals(0, s3.getEvents().size()); + } + + @Test public void testRandomPersistentFailure() throws Exception { Channel ch = new MockChannel(); int n = 100; @@ -158,7 +217,7 @@ public class TestLoadBalancingSinkProcessor { s2.setChannel(ch); // s2 always fails - s2.setFail(); + s2.setFail(true); MockSink s3 = new MockSink(3); s3.setChannel(ch); @@ -272,7 +331,7 @@ public class TestLoadBalancingSinkProcessor { s1.setChannel(ch); // s1 always fails - s1.setFail(); + s1.setFail(true); MockSink s2 = new MockSink(2); s2.setChannel(ch); @@ -282,7 +341,7 @@ public class TestLoadBalancingSinkProcessor { s3.setChannel(ch); // s3 always fails - s3.setFail(); + s3.setFail(true); List<Sink> sinks = new ArrayList<Sink>(); sinks.add(s1); @@ -317,7 +376,7 @@ public class TestLoadBalancingSinkProcessor { s2.setChannel(ch); // s2 always fails - s2.setFail(); + s2.setFail(true); MockSink s3 = new MockSink(3); s3.setChannel(ch); @@ -339,6 +398,148 @@ public class TestLoadBalancingSinkProcessor { Assert.assertTrue(s3.getEvents().size() == 2*n); } + // test that even if the sink recovers immediately that it is kept out of commission briefly + // test also verifies that when a sink fails, events are balanced over remaining sinks + @Test + public void testRoundRobinBackoffInitialFailure() throws EventDeliveryException { + Channel ch = new MockChannel(); + int n = 100; + int numEvents = 3*n; + for (int i = 0; i < numEvents; i++) { + ch.put(new MockEvent("test" + i)); + } + + MockSink s1 = new MockSink(1); + s1.setChannel(ch); + + MockSink s2 = new MockSink(2); + s2.setChannel(ch); + + MockSink s3 = new MockSink(3); + s3.setChannel(ch); + + List<Sink> sinks = new ArrayList<Sink>(); + sinks.add(s1); + sinks.add(s2); + sinks.add(s3); + + LoadBalancingSinkProcessor lbsp = getProcessor("round_robin_backoff",sinks); + + Status s = Status.READY; + for (int i = 0; i < 3 && s != Status.BACKOFF; i++) { + s = lbsp.process(); + } + s2.setFail(true); + for (int i = 0; i < 3 && s != Status.BACKOFF; i++) { + s = lbsp.process(); + } + s2.setFail(false); + while (s != Status.BACKOFF) { + s = lbsp.process(); + } + + Assert.assertEquals((3 * n) / 2, s1.getEvents().size()); + Assert.assertEquals(1, s2.getEvents().size()); + Assert.assertEquals((3 * n) /2 - 1, s3.getEvents().size()); + } + + @Test + public void testRoundRobinBackoffIncreasingBackoffs() throws EventDeliveryException, InterruptedException { + Channel ch = new MockChannel(); + int n = 100; + int numEvents = 3*n; + for (int i = 0; i < numEvents; i++) { + ch.put(new MockEvent("test" + i)); + } + + MockSink s1 = new MockSink(1); + s1.setChannel(ch); + + MockSink s2 = new MockSink(2); + s2.setChannel(ch); + s2.setFail(true); + + MockSink s3 = new MockSink(3); + s3.setChannel(ch); + + List<Sink> sinks = new ArrayList<Sink>(); + sinks.add(s1); + sinks.add(s2); + sinks.add(s3); + + LoadBalancingSinkProcessor lbsp = getProcessor("round_robin_backoff",sinks); + + Status s = Status.READY; + for (int i = 0; i < 3 && s != Status.BACKOFF; i++) { + s = lbsp.process(); + } + Assert.assertEquals(0, s2.getEvents().size()); + Thread.sleep(2100); + // this should let the sink come out of backoff and get backed off for a longer time + for (int i = 0; i < 3 && s != Status.BACKOFF; i++) { + s = lbsp.process(); + } + Assert.assertEquals(0, s2.getEvents().size()); + s2.setFail(false); + Thread.sleep(2100); + // this time it shouldn't come out of backoff yet as the timeout isn't over + for (int i = 0; i < 3 && s != Status.BACKOFF; i++) { + s = lbsp.process(); + } + Assert.assertEquals(0, s2.getEvents().size()); + // after this s2 should be receiving events agains + Thread.sleep(2100); + while (s != Status.BACKOFF) { + s = lbsp.process(); + } + + Assert.assertEquals( n + 2, s1.getEvents().size()); + Assert.assertEquals( n - 3, s2.getEvents().size()); + Assert.assertEquals( n + 1, s3.getEvents().size()); + } + + @Test + public void testRoundRobinBackoffFailureRecovery() throws EventDeliveryException, InterruptedException { + Channel ch = new MockChannel(); + int n = 100; + int numEvents = 3*n; + for (int i = 0; i < numEvents; i++) { + ch.put(new MockEvent("test" + i)); + } + + MockSink s1 = new MockSink(1); + s1.setChannel(ch); + + MockSink s2 = new MockSink(2); + s2.setChannel(ch); + s2.setFail(true); + + MockSink s3 = new MockSink(3); + s3.setChannel(ch); + + List<Sink> sinks = new ArrayList<Sink>(); + sinks.add(s1); + sinks.add(s2); + sinks.add(s3); + + LoadBalancingSinkProcessor lbsp = getProcessor("round_robin_backoff",sinks); + + Status s = Status.READY; + for (int i = 0; i < 3 && s != Status.BACKOFF; i++) { + s = lbsp.process(); + } + s2.setFail(false); + Thread.sleep(2000); + while (s != Status.BACKOFF) { + s = lbsp.process(); + } + + Assert.assertEquals(n + 1, s1.getEvents().size()); + Assert.assertEquals(n - 1, s2.getEvents().size()); + Assert.assertEquals(n, s3.getEvents().size()); + } + + @Test public void testRoundRobinNoFailure() throws Exception { @@ -388,7 +589,7 @@ public class TestLoadBalancingSinkProcessor { s1.setChannel(ch); // s1 always fails - s1.setFail(); + s1.setFail(true); MockSink s2 = new MockSink(2); s2.setChannel(ch); @@ -436,8 +637,8 @@ public class TestLoadBalancingSinkProcessor { return id; } - void setFail() { - fail = true; + void setFail(boolean bFail) { + fail = bFail; } @Override http://git-wip-us.apache.org/repos/asf/flume/blob/752f70f4/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index ffed72b..f9f2383 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1603,28 +1603,38 @@ Load balancing Sink Processor Load balancing sink processor provides the ability to load-balance flow over multiple sinks. It maintains an indexed list of active sinks on which the load must be distributed. Implementation supports distributing load using -either via ``ROUND_ROBIN`` or via ``RANDOM`` selection mechanism. The choice -of selection mechanism defaults to ``ROUND_ROBIN`` type, but can be overridden +either via ``ROUND_ROBIN``, ``RANDOM``, ``ROUND_ROBIN_BACKOFF``, or +``RANDOM_BACKOFF`` selection mechanisms. The choice of selection mechanism +defaults to ``ROUND_ROBIN`` type, but can be overridden via configuration. Custom selection mechanisms are supported via custom classes that inherits from ``LoadBalancingSelector``. When invoked, this selector picks the next sink using its configured selection -mechanism and invokes it. In case the selected sink fails to deliver the event, -the processor picks the next available sink via its configured selection mechanism. -This implementation does not blacklist the failing sink and instead continues -to optimistically attempt every available sink. If all sinks invocations -result in failure, the selector propagates the failure to the sink runner. +mechanism and invokes it. For ROUND_ROBIN and RANDOM In case the selected sink +fails to deliver the event, the processor picks the next available sink via +its configured selection mechanism. This implementation does not blacklist +the failing sink and instead continues to optimistically attempt every +available sink. If all sinks invocations result in failure, the selector +propagates the failure to the sink runner. The BACKOFF variants will blacklist +sinks that fail, removing them for selection for a given timeout. When the +timeout ends, if the sink is still unresponsive timeout is increased +exponentially to avoid potentially getting stuck in long waits on unresponsive +sinks. + + Required properties are in **bold**. -============================= =============== =============================================================== -Property Name Default Description -============================= =============== =============================================================== -**processor.sinks** -- Space separated list of sinks that are participating in the group -**processor.type** ``default`` The component type name, needs to be ``load_balance`` -processor.selector ``ROUND_ROBIN`` Selection mechanism. Must be either ``ROUND_ROBIN``, ``RANDOM`` - or custom FQDN to class that inherits from ``LoadBalancingSelector`` -============================= =============== =============================================================== +==================================== =============== =============================================================== +Property Name Default Description +==================================== =============== =============================================================== +**processor.sinks** -- Space separated list of sinks that are participating in the group +**processor.type** ``default`` The component type name, needs to be ``load_balance`` +processor.selector ``ROUND_ROBIN`` Selection mechanism. Must be either ``ROUND_ROBIN``, ``RANDOM`` + ``ROUND_ROBIN_BACKOFF``, ``RANDOM_BACKOFF`` or custom FQDN to + class that inherits from ``LoadBalancingSelector`` +processor.selector.maxBackoffMillis 30000 used by backoff selectors to limit exponential backoff in miliseconds +==================================== =============== =============================================================== Example for agent named **agent_foo**:
