Updated Branches: refs/heads/trunk b28b87b58 -> 13b8252bd
FLUME-1661: ExecSource cannot execute complex *nix commands (Roshan Naik via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/13b8252b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/13b8252b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/13b8252b Branch: refs/heads/trunk Commit: 13b8252bdeb838c606f4453bdf757fb2a1101eb8 Parents: b28b87b Author: Brock Noland <[email protected]> Authored: Mon Mar 11 14:16:55 2013 -0500 Committer: Brock Noland <[email protected]> Committed: Mon Mar 11 14:16:55 2013 -0500 ---------------------------------------------------------------------- .../java/org/apache/flume/source/ExecSource.java | 44 +++-- .../source/ExecSourceConfigurationConstants.java | 5 + .../org/apache/flume/source/TestExecSource.java | 150 +++++++++++---- flume-ng-core/src/test/resources/test_command.txt | 3 + flume-ng-doc/sphinx/FlumeUserGuide.rst | 12 ++ pom.xml | 1 + 6 files changed, 165 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java index 495b03f..8e687f2 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java @@ -140,7 +140,7 @@ Configurable { private static final Logger logger = LoggerFactory .getLogger(ExecSource.class); - + private String shell; private String command; private CounterGroup counterGroup; private ExecutorService executor; @@ -159,7 +159,7 @@ Configurable { executor = Executors.newSingleThreadExecutor(); counterGroup = new CounterGroup(); - runner = new ExecRunnable(command, getChannelProcessor(), counterGroup, + runner = new ExecRunnable(shell, command, getChannelProcessor(), counterGroup, restart, restartThrottle, logStderr, bufferCount, charset); // FIXME: Use a callback-like executor / future to signal us upon failure. @@ -229,11 +229,13 @@ Configurable { charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET, ExecSourceConfigurationConstants.DEFAULT_CHARSET)); + + shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null); } private static class ExecRunnable implements Runnable { - public ExecRunnable(String command, ChannelProcessor channelProcessor, + public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, CounterGroup counterGroup, boolean restart, long restartThrottle, boolean logStderr, int bufferCount, Charset charset) { this.command = command; @@ -244,16 +246,18 @@ Configurable { this.restart = restart; this.logStderr = logStderr; this.charset = charset; + this.shell = shell; } - private String command; - private ChannelProcessor channelProcessor; - private CounterGroup counterGroup; + private final String shell; + private final String command; + private final ChannelProcessor channelProcessor; + private final CounterGroup counterGroup; private volatile boolean restart; - private long restartThrottle; - private int bufferCount; - private boolean logStderr; - private Charset charset; + private final long restartThrottle; + private final int bufferCount; + private final boolean logStderr; + private final Charset charset; private Process process = null; @Override @@ -262,8 +266,13 @@ Configurable { String exitCode = "unknown"; BufferedReader reader = null; try { - String[] commandArgs = command.split("\\s+"); - process = new ProcessBuilder(commandArgs).start(); + if(shell != null) { + String[] commandArgs = formulateShellCommand(shell, command); + process = Runtime.getRuntime().exec(commandArgs); + } else { + String[] commandArgs = command.split("\\s+"); + process = new ProcessBuilder(commandArgs).start(); + } reader = new BufferedReader( new InputStreamReader(process.getInputStream(), charset)); @@ -315,6 +324,15 @@ Configurable { } } while(restart); } + + private static String[] formulateShellCommand(String shell, String command) { + String[] shellArgs = shell.split("\\s+"); + String[] result = new String[shellArgs.length + 1]; + System.arraycopy(shellArgs, 0, result, 0, shellArgs.length); + result[shellArgs.length] = command; + return result; + } + public int kill() { if(process != null) { synchronized (process) { @@ -336,10 +354,12 @@ Configurable { private static class StderrReader extends Thread { private BufferedReader input; private boolean logStderr; + protected StderrReader(BufferedReader input, boolean logStderr) { this.input = input; this.logStderr = logStderr; } + @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java index 1b35b01..fd5a60b 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java @@ -51,4 +51,9 @@ public class ExecSourceConfigurationConstants { */ public static final String CHARSET = "charset"; public static final String DEFAULT_CHARSET = "UTF-8"; + + /** + * Optional shell/command processor used to run command + */ + public static final String CONFIG_SHELL = "shell"; } http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java index 8bcf320..7c573f6 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java @@ -22,13 +22,9 @@ package org.apache.flume.source; import static org.junit.Assert.*; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.List; -import java.util.Random; +import java.io.*; +import java.nio.charset.Charset; +import java.util.*; import java.util.regex.Pattern; import org.apache.commons.io.FileUtils; @@ -43,40 +39,48 @@ import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; import org.apache.flume.lifecycle.LifecycleException; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import com.google.common.base.Charsets; import com.google.common.collect.Lists; +import com.google.common.io.Files; public class TestExecSource { private AbstractSource source; + private Channel channel = new MemoryChannel(); + + private Context context = new Context(); + + private ChannelSelector rcs = new ReplicatingChannelSelector(); + @Before public void setUp() { + context.put("keep-alive", "1"); + context.put("capacity", "1000"); + context.put("transactionCapacity", "1000"); + Configurables.configure(channel, context); + rcs.setChannels(Lists.newArrayList(channel)); + source = new ExecSource(); + source.setChannelProcessor(new ChannelProcessor(rcs)); + } + + @After + public void tearDown() { + source.stop(); } @Test public void testProcess() throws InterruptedException, LifecycleException, EventDeliveryException, IOException { - Channel channel = new MemoryChannel(); - Context context = new Context(); - context.put("command", "cat /etc/passwd"); context.put("keep-alive", "1"); context.put("capacity", "1000"); context.put("transactionCapacity", "1000"); Configurables.configure(source, context); - Configurables.configure(channel, context); - - ChannelSelector rcs = new ReplicatingChannelSelector(); - rcs.setChannels(Lists.newArrayList(channel)); - - source.setChannelProcessor(new ChannelProcessor(rcs)); source.start(); Transaction transaction = channel.getTransaction(); @@ -96,8 +100,6 @@ public class TestExecSource { transaction.commit(); transaction.close(); - source.stop(); - File file1 = new File("/tmp/flume-execsource." + Thread.currentThread().getId()); File file2 = new File("/etc/passwd"); @@ -106,25 +108,105 @@ public class TestExecSource { FileUtils.forceDelete(file1); } + @Test + public void testShellCommandSimple() throws InterruptedException, LifecycleException, + EventDeliveryException, IOException { + runTestShellCmdHelper("/bin/sh -c", "seq 5" + , new String[]{"1","2","3","4","5" } ); + } @Test - public void testRestart() throws InterruptedException, LifecycleException, + public void testShellCommandBackTicks() throws InterruptedException, LifecycleException, EventDeliveryException, IOException { + // command with backticks + runTestShellCmdHelper("/bin/sh -c", "echo `seq 5`" , new String[]{"1 2 3 4 5" } ); + runTestShellCmdHelper("/bin/sh -c", "echo $(seq 5)" , new String[]{"1 2 3 4 5" } ); + } - Channel channel = new MemoryChannel(); - Context context = new Context(); + @Test + public void testShellCommandComplex() throws InterruptedException, LifecycleException, + EventDeliveryException, IOException { + // command with wildcards & pipes + String[] expected = {"1234", "abcd", "ijk", "xyz", "zzz"}; + + // pipes + runTestShellCmdHelper("/bin/sh -c", "echo zzz 1234 xyz abcd ijk | xargs -n1 echo | sort -f" + , expected ); + } + + @Test + public void testShellCommandScript() throws InterruptedException, LifecycleException, + EventDeliveryException, IOException { + // mini script + runTestShellCmdHelper("/bin/sh -c", "for i in {1..5}; do echo $i;done" + , new String[]{"1","2","3","4","5" } ); + // shell arithmetic + runTestShellCmdHelper("/bin/sh -c", "if ((2+2>3)); then echo good; else echo not good; fi" , new String[]{"good"} ); + } + + @Test + public void testShellCommandEmbeddingAndEscaping() throws InterruptedException, LifecycleException, + EventDeliveryException, IOException { + System.out.println( "######### PWD = " + new java.io.File( "." ).getCanonicalPath() ); + // mini script + BufferedReader reader = new BufferedReader(new FileReader("src/test/resources/test_command.txt") ); + try { + String command1 = reader.readLine(); + Assert.assertNotNull(command1); + String[] output1 = new String[] {"'1'", "\"2\"", "\\3", "\\4"}; + runTestShellCmdHelper("/bin/sh -c", command1 , output1); + String command2 = reader.readLine(); + Assert.assertNotNull(command2); + String[] output2 = new String[]{"1","2","3","4","5" }; + runTestShellCmdHelper("/bin/sh -c", command2 , output2); + String command3 = reader.readLine(); + Assert.assertNotNull(command3); + String[] output3 = new String[]{"2","3","4","5","6" }; + runTestShellCmdHelper("/bin/sh -c", command3 , output3); + } finally { + reader.close(); + } + } + + + private void runTestShellCmdHelper(String shell, String command, String[] expectedOutput) + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { + context.put("shell", shell); + context.put("command", command); + Configurables.configure(source, context); + source.start(); + File outputFile = File.createTempFile("flumeExecSourceTest_", ""); + FileOutputStream outputStream = new FileOutputStream(outputFile); + Transaction transaction = channel.getTransaction(); + transaction.begin(); + try { + Event event; + while ((event = channel.take()) != null) { + outputStream.write(event.getBody()); + outputStream.write('\n'); + } + outputStream.close(); + transaction.commit(); + List<String> output = Files.readLines(outputFile, Charset.defaultCharset()); + + Assert.assertArrayEquals(expectedOutput, output.toArray(new String[]{})); + } finally { + FileUtils.forceDelete(outputFile); + transaction.close(); + source.stop(); + } + } + + + @Test + public void testRestart() throws InterruptedException, LifecycleException, + EventDeliveryException, IOException { context.put(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, "10"); context.put(ExecSourceConfigurationConstants.CONFIG_RESTART, "true"); context.put("command", "echo flume"); Configurables.configure(source, context); - Configurables.configure(channel, context); - - ChannelSelector rcs = new ReplicatingChannelSelector(); - rcs.setChannels(Lists.newArrayList(channel)); - - source.setChannelProcessor(new ChannelProcessor(rcs)); source.start(); Transaction transaction = channel.getTransaction(); @@ -182,19 +264,11 @@ public class TestExecSource { String command = "sleep " + seconds; Pattern pattern = Pattern.compile("\b" + command + "\b"); - Channel channel = new MemoryChannel(); - Context context = new Context(); - context.put(ExecSourceConfigurationConstants.CONFIG_RESTART, "false"); context.put("command", command); Configurables.configure(source, context); - Configurables.configure(channel, context); - ChannelSelector rcs = new ReplicatingChannelSelector(); - rcs.setChannels(Lists.newArrayList(channel)); - - source.setChannelProcessor(new ChannelProcessor(rcs)); source.start(); Thread.sleep(1000L); source.stop(); http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-core/src/test/resources/test_command.txt ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/resources/test_command.txt b/flume-ng-core/src/test/resources/test_command.txt new file mode 100644 index 0000000..81114c2 --- /dev/null +++ b/flume-ng-core/src/test/resources/test_command.txt @@ -0,0 +1,3 @@ +echo "'1'"; echo "\"2\""; echo "\\3"; echo "\4" +for i in {1..5}; do echo $i; done +for i in `seq 5`; do echo $i; done | awk ' { print $1 + 1 } ' http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index f9088f9..d72c965 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -712,6 +712,7 @@ Property Name Default Description **channels** -- **type** -- The component type name, needs to be ``exec`` **command** -- The command to execute +shell -- A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc. restartThrottle 10000 Amount of time (in millis) to wait before attempting a restart restart false Whether the executed cmd should be restarted if it dies logStdErr false Whether the command's stderr should be logged @@ -755,6 +756,17 @@ Example for agent named a1: a1.sources.r1.command = tail -F /var/log/secure a1.sources.r1.channels = c1 +The 'shell' config is used to invoke the 'command' through a command shell (such as Bash +or Powershell). The 'command' is passed as argument to 'shell' for execution. This +allows the 'command' to use features from the shell such as wildcards, back ticks, pipes, +loops, conditionals etc. In the absence of the 'shell' config, the 'command' will be +invoked directly. Common values for 'shell' : '/bin/sh -c', '/bin/ksh -c', +'cmd /c', 'powershell -Command', etc. +.. code-block:: properties + agent_foo.sources.tailsource-1.type = exec + agent_foo.sources.tailsource-1.shell = /bin/bash -c + agent_foo.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done + JMS Source ~~~~~~~~~~~ http://git-wip-us.apache.org/repos/asf/flume/blob/13b8252b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b1f8af2..00a7e61 100644 --- a/pom.xml +++ b/pom.xml @@ -534,6 +534,7 @@ limitations under the License. <exclude>**/*.avsc</exclude> <exclude>**/*.avro</exclude> <exclude>**/docs/**</exclude> + <exclude>**/test/resources/test_command.txt</exclude> </excludes> </configuration> </execution>
