Repository: incubator-zeppelin Updated Branches: refs/heads/master d9aaf3570 -> 223f8720b
[ZEPPELIN-287] Adds proper standard output capturing to FlinkInterpreter The `FlinkInterpreter` only captures output to `Console` and, thus, misses all output which goes to `System.out`. Some of Flink's functions, such as `DataSet[T].print()`, print their results to `System.out`. Consequently Zeppelin's interpreter misses this output. This PR fixes this behaviour by redirecting `System.out` and by setting `System.out` as the `PrintStream` for `Console`. Author: Till Rohrmann <[email protected]> Closes #288 from tillrohrmann/fixSysoutCapturing and squashes the following commits: 9e019b8 [Till Rohrmann] [ZEPPELIN-287] Adds proper standard output capturing to FlinkInterpreter Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/223f8720 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/223f8720 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/223f8720 Branch: refs/heads/master Commit: 223f8720bb07471cb6560e20212fb4134235ce92 Parents: d9aaf35 Author: Till Rohrmann <[email protected]> Authored: Mon Sep 7 15:03:52 2015 +0200 Committer: Lee moon soo <[email protected]> Committed: Wed Sep 9 07:37:43 2015 -0700 ---------------------------------------------------------------------- .../apache/zeppelin/flink/FlinkInterpreter.java | 28 +++++++++++++++----- .../zeppelin/flink/FlinkInterpreterTest.java | 17 ++++++++++++ 2 files changed, 39 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/223f8720/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index cb861ca..b106c7d 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -20,6 +20,9 @@ package org.apache.zeppelin.flink; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; import java.io.PrintWriter; import java.net.URL; import java.net.URLClassLoader; @@ -45,8 +48,10 @@ import scala.Console; import scala.None; import scala.Option; import scala.Some; +import scala.runtime.AbstractFunction0; import scala.tools.nsc.Settings; import scala.tools.nsc.interpreter.IMain; +import scala.tools.nsc.interpreter.Results; import scala.tools.nsc.settings.MutableSettings.BooleanSetting; import scala.tools.nsc.settings.MutableSettings.PathSetting; @@ -100,6 +105,9 @@ public class FlinkInterpreter extends Interpreter { imain = flinkIloop.intp(); + org.apache.flink.api.scala.ExecutionEnvironment env = flinkIloop.scalaEnv(); + env.getConfig().disableSysoutLogging(); + // prepare bindings imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); binder = (Map<String, Object>) getValue("_binder"); @@ -111,7 +119,7 @@ public class FlinkInterpreter extends Interpreter { imain.interpret("import org.apache.flink.api.scala._"); imain.interpret("import org.apache.flink.api.common.functions._"); - imain.bindValue("env", flinkIloop.scalaEnv()); + imain.bindValue("env", env); } private boolean localMode() { @@ -232,7 +240,7 @@ public class FlinkInterpreter extends Interpreter { } public InterpreterResult interpret(String[] lines, InterpreterContext context) { - IMain imain = flinkIloop.intp(); + final IMain imain = flinkIloop.intp(); String[] linesToRun = new String[lines.length + 1]; for (int i = 0; i < lines.length; i++) { @@ -240,13 +248,13 @@ public class FlinkInterpreter extends Interpreter { } linesToRun[lines.length] = "print(\"\")"; - Console.setOut(out); + System.setOut(new PrintStream(out)); out.reset(); Code r = null; String incomplete = ""; for (int l = 0; l < linesToRun.length; l++) { - String s = linesToRun[l]; + final String s = linesToRun[l]; // check if next line starts with "." (but not ".." or "./") it is treated as an invocation if (l + 1 < linesToRun.length) { String nextLine = linesToRun[l + 1].trim(); @@ -256,9 +264,18 @@ public class FlinkInterpreter extends Interpreter { } } + final String currentCommand = incomplete; + scala.tools.nsc.interpreter.Results.Result res = null; try { - res = imain.interpret(incomplete + s); + res = Console.withOut( + System.out, + new AbstractFunction0<Results.Result>() { + @Override + public Results.Result apply() { + return imain.interpret(currentCommand + s); + } + }); } catch (Exception e) { logger.info("Interpreter exception", e); return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); @@ -328,5 +345,4 @@ public class FlinkInterpreter extends Interpreter { static final String toString(Object o) { return (o instanceof String) ? (String) o : ""; } - } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/223f8720/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index d0eda26..3168f04 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -17,8 +17,10 @@ */ package org.apache.zeppelin.flink; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import java.util.Arrays; import java.util.Properties; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -55,6 +57,13 @@ public class FlinkInterpreterTest { } @Test + public void testSimpleStatementWithSystemOutput() { + InterpreterResult result = flink.interpret("val a=1", context); + result = flink.interpret("System.out.print(a)", context); + assertEquals("1", result.message()); + } + + @Test public void testNextlineInvoke() { InterpreterResult result = flink.interpret("\"123\"\n .toInt", context); assertEquals("res0: Int = 123\n", result.message()); @@ -66,5 +75,13 @@ public class FlinkInterpreterTest { flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context); InterpreterResult result = flink.interpret("counts.print()", context); assertEquals(Code.SUCCESS, result.code()); + + String[] expectedCounts = {"(to,2)", "(be,2)", "(or,1)", "(not,1)"}; + Arrays.sort(expectedCounts); + + String[] counts = result.message().split("\n"); + Arrays.sort(counts); + + assertArrayEquals(expectedCounts, counts); } }
