Repository: flume Updated Branches: refs/heads/flume-1.7 589bfa202 -> a7ffb2701
FLUME-2712. Optional channel errors slows down the Source to Main channel event rate (Johny Rufus via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a7ffb270 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a7ffb270 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a7ffb270 Branch: refs/heads/flume-1.7 Commit: a7ffb27014f5808633f829b5f1e72a0290bd35a5 Parents: 589bfa2 Author: Hari Shreedharan <[email protected]> Authored: Fri Oct 30 16:36:40 2015 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Oct 30 16:41:50 2015 -0700 ---------------------------------------------------------------------- .../apache/flume/channel/ChannelProcessor.java | 146 +++++++------------ .../flume/channel/TestChannelProcessor.java | 69 +++++++++ 2 files changed, 122 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/a7ffb270/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java index 1cce137..f2612a6 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java @@ -20,10 +20,14 @@ package org.apache.flume.channel; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import org.apache.flume.Channel; import org.apache.flume.ChannelException; @@ -57,6 +61,7 @@ public class ChannelProcessor implements Configurable { private final ChannelSelector selector; private final InterceptorChain interceptorChain; + private ExecutorService execService; public ChannelProcessor(ChannelSelector selector) { this.selector = selector; @@ -77,6 +82,8 @@ public class ChannelProcessor implements Configurable { */ @Override public void configure(Context context) { + this.execService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("OptionalChannelProcessorThread").build()); configureInterceptors(context); } @@ -153,7 +160,6 @@ public class ChannelProcessor implements Configurable { for (Event event : events) { List<Channel> reqChannels = selector.getRequiredChannels(event); - for (Channel ch : reqChannels) { List<Event> eventQueue = reqChannelQueue.get(ch); if (eventQueue == null) { @@ -164,74 +170,26 @@ public class ChannelProcessor implements Configurable { } List<Channel> optChannels = selector.getOptionalChannels(event); - for (Channel ch: optChannels) { List<Event> eventQueue = optChannelQueue.get(ch); if (eventQueue == null) { eventQueue = new ArrayList<Event>(); optChannelQueue.put(ch, eventQueue); } - eventQueue.add(event); } } // Process required channels for (Channel reqChannel : reqChannelQueue.keySet()) { - Transaction tx = reqChannel.getTransaction(); - Preconditions.checkNotNull(tx, "Transaction object must not be null"); - try { - tx.begin(); - - List<Event> batch = reqChannelQueue.get(reqChannel); - - for (Event event : batch) { - reqChannel.put(event); - } - - tx.commit(); - } catch (Throwable t) { - tx.rollback(); - if (t instanceof Error) { - LOG.error("Error while writing to required channel: " + - reqChannel, t); - throw (Error) t; - } else { - throw new ChannelException("Unable to put batch on required " + - "channel: " + reqChannel, t); - } - } finally { - if (tx != null) { - tx.close(); - } - } + List<Event> batch = reqChannelQueue.get(reqChannel); + executeChannelTransaction(reqChannel, batch, false); } // Process optional channels for (Channel optChannel : optChannelQueue.keySet()) { - Transaction tx = optChannel.getTransaction(); - Preconditions.checkNotNull(tx, "Transaction object must not be null"); - try { - tx.begin(); - - List<Event> batch = optChannelQueue.get(optChannel); - - for (Event event : batch ) { - optChannel.put(event); - } - - tx.commit(); - } catch (Throwable t) { - tx.rollback(); - LOG.error("Unable to put batch on optional channel: " + optChannel, t); - if (t instanceof Error) { - throw (Error) t; - } - } finally { - if (tx != null) { - tx.close(); - } - } + List<Event> batch = optChannelQueue.get(optChannel); + execService.submit(new OptionalChannelTransactionRunnable(optChannel, batch)); } } @@ -253,57 +211,59 @@ public class ChannelProcessor implements Configurable { if (event == null) { return; } + List<Event> events = new ArrayList<Event>(1); + events.add(event); // Process required channels List<Channel> requiredChannels = selector.getRequiredChannels(event); for (Channel reqChannel : requiredChannels) { - Transaction tx = reqChannel.getTransaction(); - Preconditions.checkNotNull(tx, "Transaction object must not be null"); - try { - tx.begin(); - - reqChannel.put(event); - - tx.commit(); - } catch (Throwable t) { - tx.rollback(); - if (t instanceof Error) { - LOG.error("Error while writing to required channel: " + - reqChannel, t); - throw (Error) t; - } else { - throw new ChannelException("Unable to put event on required " + - "channel: " + reqChannel, t); - } - } finally { - if (tx != null) { - tx.close(); - } - } + executeChannelTransaction(reqChannel, events, false); } // Process optional channels List<Channel> optionalChannels = selector.getOptionalChannels(event); for (Channel optChannel : optionalChannels) { - Transaction tx = null; - try { - tx = optChannel.getTransaction(); - tx.begin(); + execService.submit(new OptionalChannelTransactionRunnable(optChannel, events)); + } + } - optChannel.put(event); + private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) { + Transaction tx = channel.getTransaction(); + Preconditions.checkNotNull(tx, "Transaction object must not be null"); + try { + tx.begin(); - tx.commit(); - } catch (Throwable t) { - tx.rollback(); - LOG.error("Unable to put event on optional channel: " + optChannel, t); - if (t instanceof Error) { - throw (Error) t; - } - } finally { - if (tx != null) { - tx.close(); - } + for (Event event : batch) { + channel.put(event); } + + tx.commit(); + } catch (Throwable t) { + tx.rollback(); + if (t instanceof Error) { + LOG.error("Error while writing to channel: " + + channel, t); + throw (Error) t; + } else if(!isOptional) { + throw new ChannelException("Unable to put batch on required " + + "channel: " + channel, t); + } + } finally { + tx.close(); + } + } + + private static class OptionalChannelTransactionRunnable implements Runnable { + private Channel channel; + private List<Event> events; + + OptionalChannelTransactionRunnable(Channel channel, List<Event> events) { + this.channel = channel; + this.events = events; + } + + public void run() { + executeChannelTransaction(channel, events, true); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/a7ffb270/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java index 0656596..924c998 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java @@ -20,11 +20,16 @@ package org.apache.flume.channel; import com.google.common.base.Charsets; import com.google.common.collect.Lists; + +import java.util.ArrayList; import java.util.List; import org.apache.flume.Channel; import org.apache.flume.ChannelException; import org.apache.flume.ChannelSelector; import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.Context; +import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.junit.Assert; import org.junit.Test; @@ -79,4 +84,68 @@ public class TestChannelProcessor { Assert.assertTrue("Must throw NPE", threw); } + /* + * Test delivery to optional and required channels + * Test both processEvent and processEventBatch + */ + @Test + public void testRequiredAndOptionalChannels() { + Context context = new Context(); + ArrayList<Channel> channels = new ArrayList<Channel>(); + for(int i = 0; i < 4; i++) { + Channel ch = new MemoryChannel(); + ch.setName("ch"+i); + Configurables.configure(ch, context); + channels.add(ch); + } + + ChannelSelector selector = new ReplicatingChannelSelector(); + selector.setChannels(channels); + + context = new Context(); + context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch2 ch3"); + Configurables.configure(selector, context); + + ChannelProcessor processor = new ChannelProcessor(selector); + context = new Context(); + Configurables.configure(processor, context); + + + Event event1 = EventBuilder.withBody("event 1", Charsets.UTF_8); + processor.processEvent(event1); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + } + + for(Channel channel : channels) { + Transaction transaction = channel.getTransaction(); + transaction.begin(); + Event event_ch = channel.take(); + Assert.assertEquals(event1, event_ch); + transaction.commit(); + transaction.close(); + } + + List<Event> events = Lists.newArrayList(); + for(int i = 0; i < 100; i ++) { + events.add(EventBuilder.withBody("event "+i, Charsets.UTF_8)); + } + processor.processEventBatch(events); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + } + for(Channel channel : channels) { + Transaction transaction = channel.getTransaction(); + transaction.begin(); + for(int i = 0; i < 100; i ++) { + Event event_ch = channel.take(); + Assert.assertNotNull(event_ch); + } + transaction.commit(); + transaction.close(); + } + } + }
