document executor
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/aa6f3e56 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/aa6f3e56 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/aa6f3e56 Branch: refs/heads/master Commit: aa6f3e561dbe434dd234bb265dcd3fbf6d051ae7 Parents: a93bb17 Author: sblackmon <[email protected]> Authored: Tue Apr 1 15:15:45 2014 -0500 Committer: sblackmon <[email protected]> Committed: Tue Apr 1 15:15:45 2014 -0500 ---------------------------------------------------------------------- .../streams/pig/StreamsProcessDocumentExec.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aa6f3e56/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java index 469aa3f..112ed45 100644 --- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java @@ -36,31 +36,46 @@ public class StreamsProcessDocumentExec extends EvalFunc<String> { ObjectMapper mapper = StreamsJacksonMapper.getInstance(); public StreamsProcessDocumentExec(String... execArgs) throws ClassNotFoundException{ + System.out.println("A"); Preconditions.checkNotNull(execArgs); + System.out.println("B"); Preconditions.checkArgument(execArgs.length > 0); + System.out.println("C"); String processorFullName = execArgs[0]; + System.out.println("D"); Preconditions.checkNotNull(processorFullName); + System.out.println("E"); streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(processorFullName)); + System.out.println("F"); streamsProcessor.prepare(null); + System.out.println("G"); } @Override public String exec(Tuple input) throws IOException { + System.out.println("H"); Preconditions.checkNotNull(streamsProcessor); Preconditions.checkNotNull(input); Preconditions.checkArgument(input.size() == 1); + System.out.println("I"); String document = (String) input.get(0); Preconditions.checkNotNull(document); + System.out.println(document); + StreamsDatum entry = new StreamsDatum(document); Preconditions.checkNotNull(entry); + System.out.println(entry); + List<StreamsDatum> resultSet = streamsProcessor.process(entry); + System.out.println(resultSet); + Object resultDoc = null; for( StreamsDatum resultDatum : resultSet ) { resultDoc = resultDatum.getDocument(); @@ -68,6 +83,8 @@ public class StreamsProcessDocumentExec extends EvalFunc<String> { Preconditions.checkNotNull(resultDoc); + System.out.println(resultDoc); + if( resultDoc instanceof String ) return (String) resultDoc; else if( resultDoc instanceof ObjectNode)
