[
https://issues.apache.org/jira/browse/FLUME-1661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13490073#comment-13490073
]
Nitin Verma edited comment on FLUME-1661 at 11/3/12 7:15 PM:
-------------------------------------------------------------
Please refer: http://pastebin.com/9ZHNkpum (Do see it has the code I am going
to talk about.)
I feel ExecSource should not carry too much code for process handling, for
separation of concern I have created a process handling code.
Features:-
1) User can synchronously or asynchronously read the stdout or stderr of the
process.
For reading asynchronously user needs to create or use an implementation of
InputStreamRunner interface.
2) Exit codes are well propagated from script -> shell -> JVM
3) With use of shell's stdin any shell command or even scripts can be run.
4) FlumeProcessExecutor can be used for ExecSource and ShellExecSource.
I believe this code belongs to flume-ng-sdk and be called from flume-ng-core.
Test cases:-
Following are some tests I did.
1) To test exit code propagation by exiting from inside a shell.
2) To test sequence of command on /bin/sh
perl -e 'print \"\tHello perl!!!\n\"' ;
mkdir -p /tmp/.flume-unit-test ; cd /tmp/.flume-unit-test ; ls -la >&2 ;
cat /etc/passwd | awk -F: 'BEGIN {print \"Hello AWK\"} {print $NF}' | sort |
uniq -c >&2 ;
exit 101
3) Tested another flavour of shell /bin/tcsh
{code:java}
package edu.nitin.testcodes;
import java.io.IOException;
import org.testng.Assert;
import org.testng.annotations.Test;
public class FlumeProcessExecutorTest {
@Test
public void testProcess() throws ProcessExecutionException, IOException,
InterruptedException {
// Exit code test with /bin/sh
{
System.out.println("Running...");
final int exitWith = 174;
final FlumeProcessExecutor flumeProcessExecutor =
new FlumeProcessExecutor("sh");
flumeProcessExecutor.start();
flumeProcessExecutor.getStandardInput().write(("exit " +
exitWith).getBytes());
flumeProcessExecutor.getStandardInput().close();
int exitCode = Integer.MIN_VALUE;
try {
exitCode = flumeProcessExecutor.waitFor();
} catch (final ProcessExecutionException pee) {
pee.printStackTrace();
try {
exitCode = flumeProcessExecutor.exitCode();
} catch (final ProcessExecutionException pee2) {
pee2.printStackTrace();
flumeProcessExecutor.stop();
}
}
Assert.assertEquals(exitCode, exitWith);
System.out.println("...Done");
}
//invoking the shell (sh)
{
System.out.println("Running...");
final FlumeProcessExecutor flumeProcessExecutor = new
FlumeProcessExecutor("sh");
flumeProcessExecutor.start();
{code}
:(
/*Jira java code rendering plug-in is not able to parse this
section of code thus commented*/
/*To be clear following line gets execute in the test case*/
// flumeProcessExecutor.getStandardInput().write(("perl -e 'print
\"\\tHello perl!!!\\n\"' ; "
// + " mkdir -p /tmp/.flume-unit-test ; cd
/tmp/.flume-unit-test ; ls -la >&2 ; "
// + " cat /etc/passwd | awk -F: 'BEGIN {print \"Hello
AWK\"} {print $NF}' | sort | uniq -c >&2 ;"
// + " \n exit 101").getBytes());
{code:java}
flumeProcessExecutor.getStandardInput().close();
int exitCode = Integer.MIN_VALUE;
try {
exitCode = flumeProcessExecutor.waitFor();
} catch (final ProcessExecutionException pee) {
pee.printStackTrace();
try {
System.out.println("tring exitCode");
exitCode = flumeProcessExecutor.exitCode();
} catch (final ProcessExecutionException pee2) {
pee2.printStackTrace();
flumeProcessExecutor.stop();
}
}
System.out.println("...Done with " + exitCode);
}
//invoking tcsh
{
System.out.println("Running...");
final FlumeProcessExecutor flumeProcessExecutor =
new FlumeProcessExecutor("tcsh");
flumeProcessExecutor.start();
flumeProcessExecutor.getStandardInput().write("set array = (a b c)
&& echo $array".getBytes());
flumeProcessExecutor.getStandardInput().close();
int exitCode = Integer.MIN_VALUE;
try {
exitCode = flumeProcessExecutor.waitFor();
} catch (final ProcessExecutionException pee) {
pee.printStackTrace();
try {
System.out.println("tring exitCode");
exitCode = flumeProcessExecutor.exitCode();
} catch (final ProcessExecutionException pee2) {
pee2.printStackTrace();
flumeProcessExecutor.stop();
}
}
Assert.assertEquals(exitCode, 0);
System.out.println("...Done");
}
}
}
{code}
edited because of Jira formatting issues.
was (Author: nitin_matrix):
Please refer: http://pastebin.com/9ZHNkpum (Do see it has the code I am
going to talk about.)
I feel ExecSource should not carry too much code for process handling, for
separation of concern I have created a process handling code.
Features:-
1) User can synchronously or asynchronously read the stdout or stderr of the
process.
For reading asynchronously user needs to create or use an implementation of
InputStreamRunner interface.
2) Exit codes are well propagated from script -> shell -> JVM
3) With use of shell's stdin any shell command or even scripts can be run.
4) FlumeProcessExecutor can be used for ExecSource and ShellExecSource.
I believe this code belongs to flume-ng-sdk and be called from flume-ng-core.
Test cases:-
Following are some tests I did.
1) To test exit code propagation by exiting from inside a shell.
2) To test sequence of command on /bin/sh
perl -e 'print \"\tHello perl!!!\n\"' ;
mkdir -p /tmp/.flume-unit-test ; cd /tmp/.flume-unit-test ; ls -la >&2 ;
cat /etc/passwd | awk -F: 'BEGIN {print \"Hello AWK\"} {print $NF}' | sort |
uniq -c >&2 ;
exit 101
3) Tested another flavour of shell /bin/tcsh
{code:java}
package edu.nitin.testcodes;
import java.io.IOException;
import org.testng.Assert;
import org.testng.annotations.Test;
public class FlumeProcessExecutorTest {
@Test
public void testProcess() throws ProcessExecutionException, IOException,
InterruptedException {
// Exit code test with /bin/sh
{
System.out.println("Running...");
final int exitWith = 174;
final FlumeProcessExecutor flumeProcessExecutor =
new FlumeProcessExecutor("sh");
flumeProcessExecutor.start();
flumeProcessExecutor.getStandardInput().write(("exit " +
exitWith).getBytes());
flumeProcessExecutor.getStandardInput().close();
int exitCode = Integer.MIN_VALUE;
try {
exitCode = flumeProcessExecutor.waitFor();
} catch (final ProcessExecutionException pee) {
pee.printStackTrace();
try {
exitCode = flumeProcessExecutor.exitCode();
} catch (final ProcessExecutionException pee2) {
pee2.printStackTrace();
flumeProcessExecutor.stop();
}
}
Assert.assertEquals(exitCode, exitWith);
System.out.println("...Done");
}
//invoking the shell (sh)
{
System.out.println("Running...");
final FlumeProcessExecutor flumeProcessExecutor = new
FlumeProcessExecutor("sh");
flumeProcessExecutor.start();
flumeProcessExecutor.getStandardInput().write(("perl -e 'print
\"\\tHello perl!!!\\n\"' ; "
+ " mkdir -p /tmp/.flume-unit-test ; cd
/tmp/.flume-unit-test ; ls -la >&2 ; "
+ " cat /etc/passwd | awk -F: 'BEGIN {print \"Hello AWK\"}
{print $NF}' | sort | uniq -c >&2 ;"
+ " \n exit 101").getBytes());
flumeProcessExecutor.getStandardInput().close();
int exitCode = Integer.MIN_VALUE;
try {
exitCode = flumeProcessExecutor.waitFor();
} catch (final ProcessExecutionException pee) {
pee.printStackTrace();
try {
System.out.println("tring exitCode");
exitCode = flumeProcessExecutor.exitCode();
} catch (final ProcessExecutionException pee2) {
pee2.printStackTrace();
flumeProcessExecutor.stop();
}
}
System.out.println("...Done with " + exitCode);
}
//invoking tcsh
{
System.out.println("Running...");
final FlumeProcessExecutor flumeProcessExecutor =
new FlumeProcessExecutor("tcsh");
flumeProcessExecutor.start();
flumeProcessExecutor.getStandardInput().write("set array = (a b c)
&& echo $array".getBytes());
flumeProcessExecutor.getStandardInput().close();
int exitCode = Integer.MIN_VALUE;
try {
exitCode = flumeProcessExecutor.waitFor();
} catch (final ProcessExecutionException pee) {
pee.printStackTrace();
try {
System.out.println("tring exitCode");
exitCode = flumeProcessExecutor.exitCode();
} catch (final ProcessExecutionException pee2) {
pee2.printStackTrace();
flumeProcessExecutor.stop();
}
}
Assert.assertEquals(exitCode, 0);
System.out.println("...Done");
}
}
}
{code}
> ExecSource cannot execute (little complicated..) *nix commands
> --------------------------------------------------------------
>
> Key: FLUME-1661
> URL: https://issues.apache.org/jira/browse/FLUME-1661
> Project: Flume
> Issue Type: Improvement
> Components: Sinks+Sources
> Affects Versions: v1.2.0
> Reporter: Yoonseok Woo
> Assignee: Roshan Naik
> Fix For: v1.3.0
>
> Attachments: FLUME-1661-1.patch, FLUME-1661.patch,
> FLUME-1661.patch.v2, FLUME-1661.patch.v3
>
>
> * command line parsing
> ** conf/flume.conf
> {code}
> agent.sources.source1.type = exec
> agent.sources.source1.command = tail -f
> /some/path/logs/exception/error.log.`date +%Y%m%d%H`
> {code}
> ** result
> {code}
> tail: /some/path/logs/exception/error.log.`date: No such file or directory
> tail: +%Y%m%d%H`: No such file or directory
> {code}
> ** needs to be improved
> {code}
> (ExecSouce.java:242) String[] commandArgs = command.split("\\s+")
> {code}
> * using special character (e.g. *, `, ', ...)
> ** conf/flume.conf
> {code}
> agent.sources.source1.type = exec
> agent.sources.source1.command = tail -f /some/path/logs/exception/error.log.*
> {code}
> ** result
> {code}
> tail: /some/path/logs/exception/error.log.*: No such file or directory
> {code}
> ** needs to be improved
> {code}
> (ExecSouce.java:243) process = new ProcessBuilder(commandArgs).start();
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira