Updated Branches: refs/heads/flume-1.4 f31b3947d -> c146e5106
FLUME-1819. ExecSource must flush events to channel periodically. (Venkatesh Sivasubramanian via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c146e510 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c146e510 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c146e510 Branch: refs/heads/flume-1.4 Commit: c146e5106c344b4ee438f4f396c3bf8544d31c66 Parents: f31b394 Author: Hari Shreedharan <[email protected]> Authored: Thu Apr 25 11:19:31 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Thu Apr 25 11:20:32 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/flume/source/ExecSource.java | 104 ++++++++++++--- .../source/ExecSourceConfigurationConstants.java | 8 +- .../org/apache/flume/source/TestExecSource.java | 42 ++++++ 3 files changed, 136 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c146e510/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java index 3c9437d..1d8d267 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.flume.Channel; @@ -34,6 +36,7 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.Source; +import org.apache.flume.SystemClock; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; @@ -42,6 +45,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.nio.charset.Charset; /** @@ -149,6 +154,7 @@ Configurable { private boolean restart; private boolean logStderr; private Integer bufferCount; + private long batchTimeout; private ExecRunnable runner; private Charset charset; @@ -159,7 +165,7 @@ Configurable { executor = Executors.newSingleThreadExecutor(); runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, - restart, restartThrottle, logStderr, bufferCount, charset); + restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset); // FIXME: Use a callback-like executor / future to signal us upon failure. runnerFuture = executor.submit(runner); @@ -178,17 +184,16 @@ Configurable { @Override public void stop() { logger.info("Stopping exec source with command:{}", command); - if(runner != null) { runner.setRestart(false); runner.kill(); } + if (runnerFuture != null) { logger.debug("Stopping exec runner"); runnerFuture.cancel(true); logger.debug("Exec runner stopped"); } - executor.shutdown(); while (!executor.isTerminated()) { @@ -228,6 +233,9 @@ Configurable { bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE, ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE); + batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, + ExecSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT); + charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET, ExecSourceConfigurationConstants.DEFAULT_CHARSET)); @@ -242,12 +250,13 @@ Configurable { public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean restart, long restartThrottle, - boolean logStderr, int bufferCount, Charset charset) { + boolean logStderr, int bufferCount, long batchTimeout, Charset charset) { this.command = command; this.channelProcessor = channelProcessor; this.sourceCounter = sourceCounter; this.restartThrottle = restartThrottle; this.bufferCount = bufferCount; + this.batchTimeout = batchTimeout; this.restart = restart; this.logStderr = logStderr; this.charset = charset; @@ -261,15 +270,27 @@ Configurable { private volatile boolean restart; private final long restartThrottle; private final int bufferCount; + private long batchTimeout; private final boolean logStderr; private final Charset charset; private Process process = null; + private SystemClock systemClock = new SystemClock(); + private Long lastPushToChannel = systemClock.currentTimeMillis(); + ScheduledExecutorService timedFlushService; + ScheduledFuture<?> future; @Override public void run() { do { String exitCode = "unknown"; BufferedReader reader = null; + String line = null; + final List<Event> eventList = new ArrayList<Event>(); + + timedFlushService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat( + "timedFlushExecService" + + Thread.currentThread().getId() + "-%d").build()); try { if(shell != null) { String[] commandArgs = formulateShellCommand(shell, command); @@ -288,20 +309,39 @@ Configurable { stderrReader.setDaemon(true); stderrReader.start(); - String line = null; - List<Event> eventList = new ArrayList<Event>(); + future = timedFlushService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + synchronized (eventList) { + if(!eventList.isEmpty() && timeout()) { + flushEventBatch(eventList); + } + } + } catch (Exception e) { + logger.error("Exception occured when processing event batch", e); + if(e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + } + } + }, + batchTimeout, batchTimeout, TimeUnit.MILLISECONDS); + while ((line = reader.readLine()) != null) { - sourceCounter.incrementEventReceivedCount(); - eventList.add(EventBuilder.withBody(line.getBytes(charset))); - if(eventList.size() >= bufferCount) { - channelProcessor.processEventBatch(eventList); - sourceCounter.addToEventAcceptedCount(eventList.size()); - eventList.clear(); + synchronized (eventList) { + sourceCounter.incrementEventReceivedCount(); + eventList.add(EventBuilder.withBody(line.getBytes(charset))); + if(eventList.size() >= bufferCount || timeout()) { + flushEventBatch(eventList); + } } } - if(!eventList.isEmpty()) { - channelProcessor.processEventBatch(eventList); - sourceCounter.addToEventAcceptedCount(eventList.size()); + + synchronized (eventList) { + if(!eventList.isEmpty()) { + flushEventBatch(eventList); + } } } catch (Exception e) { logger.error("Failed while running command: " + command, e); @@ -332,6 +372,17 @@ Configurable { } while(restart); } + private void flushEventBatch(List<Event> eventList){ + channelProcessor.processEventBatch(eventList); + sourceCounter.addToEventAcceptedCount(eventList.size()); + eventList.clear(); + lastPushToChannel = systemClock.currentTimeMillis(); + } + + private boolean timeout(){ + return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout; + } + private static String[] formulateShellCommand(String shell, String command) { String[] shellArgs = shell.split("\\s+"); String[] result = new String[shellArgs.length + 1]; @@ -344,8 +395,28 @@ Configurable { if(process != null) { synchronized (process) { process.destroy(); + try { - return process.waitFor(); + int exitValue = process.waitFor(); + + // Stop the Thread that flushes periodically + if (future != null) { + future.cancel(true); + } + + if (timedFlushService != null) { + timedFlushService.shutdown(); + while (!timedFlushService.isTerminated()) { + try { + timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.debug("Interrupted while waiting for exec executor service " + + "to stop. Just exiting."); + Thread.currentThread().interrupt(); + } + } + } + return exitValue; } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } @@ -392,6 +463,5 @@ Configurable { } } } - } } http://git-wip-us.apache.org/repos/asf/flume/blob/c146e510/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java index fd5a60b..957ec7f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java @@ -18,7 +18,6 @@ */ package org.apache.flume.source; -import java.nio.charset.Charset; public class ExecSourceConfigurationConstants { @@ -47,6 +46,13 @@ public class ExecSourceConfigurationConstants { public static final int DEFAULT_BATCH_SIZE = 20; /** + * Amount of time to wait, if the buffer size was not reached, before + * to data is pushed downstream: : default 3000 ms + */ + public static final String CONFIG_BATCH_TIME_OUT = "batchTimeout"; + public static final long DEFAULT_BATCH_TIME_OUT = 3000l; + + /** * Charset for reading input */ public static final String CHARSET = "charset"; http://git-wip-us.apache.org/repos/asf/flume/blob/c146e510/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java index 77e9a44..54f71a1 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java @@ -227,6 +227,48 @@ public class TestExecSource { } } + @Test + public void testBatchTimeout() throws InterruptedException, LifecycleException, + EventDeliveryException, IOException { + + String filePath = "/tmp/flume-execsource." + Thread.currentThread().getId(); + String eventBody = "TestMessage"; + FileOutputStream outputStream = new FileOutputStream(filePath); + + context.put(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE, "50000"); + context.put(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, "750"); + context.put("shell", "/bin/bash -c"); + context.put("command", "tail -f " + filePath); + + Configurables.configure(source, context); + source.start(); + + Transaction transaction = channel.getTransaction(); + transaction.begin(); + + for (int lineNumber = 0; lineNumber < 3; lineNumber++) { + outputStream.write((eventBody).getBytes()); + outputStream.write(String.valueOf(lineNumber).getBytes()); + outputStream.write('\n'); + outputStream.flush(); + } + outputStream.close(); + Thread.sleep(1500); + + for(int i = 0; i < 3; i++) { + Event event = channel.take(); + assertNotNull(event); + assertNotNull(event.getBody()); + assertEquals(eventBody + String.valueOf(i), new String(event.getBody())); + } + + transaction.commit(); + transaction.close(); + source.stop(); + File file = new File(filePath); + FileUtils.forceDelete(file); + } + private void runTestShellCmdHelper(String shell, String command, String[] expectedOutput) throws InterruptedException, LifecycleException, EventDeliveryException, IOException { context.put("shell", shell);
