Updated Branches: refs/heads/flume-1.3.0 80176f340 -> dcc8a0803
FLUME-1590. ExecSource should kill child process when it stops. (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dcc8a080 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dcc8a080 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dcc8a080 Branch: refs/heads/flume-1.3.0 Commit: dcc8a080317e83e7207bc6f3ec958dc83b3d0c77 Parents: 80176f3 Author: Hari Shreedharan <[email protected]> Authored: Mon Sep 24 11:51:25 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Mon Sep 24 11:52:12 2012 -0700 ---------------------------------------------------------------------- .../java/org/apache/flume/source/ExecSource.java | 31 ++++-- .../org/apache/flume/source/TestExecSource.java | 88 +++++++++++++++ 2 files changed, 109 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/dcc8a080/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java index 155f0e2..46f672f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java @@ -179,6 +179,7 @@ Configurable { if(runner != null) { runner.setRestart(false); + runner.kill(); } if (runnerFuture != null) { logger.debug("Stopping exec runner"); @@ -246,13 +247,13 @@ Configurable { private long restartThrottle; private int bufferCount; private boolean logStderr; + private Process process = null; @Override public void run() { do { String exitCode = "unknown"; BufferedReader reader = null; - Process process = null; try { String[] commandArgs = command.split("\\s+"); process = new ProcessBuilder(commandArgs).start(); @@ -292,25 +293,35 @@ Configurable { logger.error("Failed to close reader for exec source", ex); } } - if(process != null) { - process.destroy(); - try { - exitCode = String.valueOf(process.waitFor()); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } + exitCode = String.valueOf(kill()); } if(restart) { - logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode); + logger.info("Restarting in {}ms, exit code {}", restartThrottle, + exitCode); try { Thread.sleep(restartThrottle); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + } else { + logger.info("Command [" + command + "] exited with " + exitCode); } } while(restart); } + public int kill() { + if(process != null) { + synchronized (process) { + process.destroy(); + try { + return process.waitFor(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + return Integer.MIN_VALUE; + } + return Integer.MIN_VALUE / 2; + } public void setRestart(boolean restart) { this.restart = restart; } http://git-wip-us.apache.org/repos/asf/flume/blob/dcc8a080/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java index 615f2a3..8bcf320 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java @@ -22,9 +22,14 @@ package org.apache.flume.source; import static org.junit.Assert.*; +import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Random; +import java.util.regex.Pattern; import org.apache.commons.io.FileUtils; import org.apache.flume.Channel; @@ -143,4 +148,87 @@ public class TestExecSource { source.stop(); } + + + /** + * Tests to make sure that the shutdown mechanism works. There are races + * in this test if the system has another sleep command running with the + * same sleep interval but we pick rarely used sleep times and make an + * effort to detect if our sleep time is already in use. Note the + * ps -ef command should work on both macs and linux. + */ + @Test + public void testShutdown() throws Exception { + int seconds = 272; // pick a rare sleep time + + // now find one that is not in use + boolean searchForCommand = true; + while(searchForCommand) { + searchForCommand = false; + String command = "sleep " + seconds; + Pattern pattern = Pattern.compile("\b" + command + "\b"); + for(String line : exec("ps -ef")) { + if(pattern.matcher(line).find()) { + seconds++; + searchForCommand = true; + break; + } + } + } + + // yes in the mean time someone could use our sleep time + // but this should be a fairly rare scenerio + + String command = "sleep " + seconds; + Pattern pattern = Pattern.compile("\b" + command + "\b"); + + Channel channel = new MemoryChannel(); + Context context = new Context(); + + context.put(ExecSourceConfigurationConstants.CONFIG_RESTART, "false"); + + context.put("command", command); + Configurables.configure(source, context); + Configurables.configure(channel, context); + + ChannelSelector rcs = new ReplicatingChannelSelector(); + rcs.setChannels(Lists.newArrayList(channel)); + + source.setChannelProcessor(new ChannelProcessor(rcs)); + source.start(); + Thread.sleep(1000L); + source.stop(); + Thread.sleep(1000L); + for(String line : exec("ps -ef")) { + if(pattern.matcher(line).find()) { + Assert.fail("Found [" + line + "]"); + } + } + } + + private static List<String> exec(String command) throws Exception { + String[] commandArgs = command.split("\\s+"); + Process process = new ProcessBuilder(commandArgs).start(); + BufferedReader reader = null; + try { + reader = new BufferedReader( + new InputStreamReader(process.getInputStream())); + List<String> result = Lists.newArrayList(); + String line; + while((line = reader.readLine()) != null) { + result.add(line); + } + return result; + } finally { + process.destroy(); + if(reader != null) { + reader.close(); + } + int exit = process.waitFor(); + if(exit != 0) { + throw new IllegalStateException("Command [" + command + "] exited with " + + exit); + } + } + } }
