Repository: incubator-zeppelin Updated Branches: refs/heads/master 42cccd157 -> a7a7bdb68
[ZEPPELIN-760] Companion object bug fix for Flink Interpretter ### What is this PR for? This implements the solution presented for the Spark interpreter in PR-780 for the Flink, Scalding, ~~and Ignite~~ interpreters. ### What type of PR is it? Improvement ### Todos * [x] - Fix line splitting * [x] - Add tests ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-658 * https://issues.apache.org/jira/browse/ZEPPELIN-747 ### How should this be tested? Add a new paragraph with each interpreter and define a class with its companion object. See bug ZEPPELIN-658 for more details ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Trevor Grant <[email protected]> Closes #794 from rawkintrevo/flink-companion-objects and squashes the following commits: 9b90890 [Trevor Grant] [ZEPPELIN-760] Dropped Ignite Fix/Tests 25640eb [Trevor Grant] [ZEPPELIN-760] Rearranged Ignite Tests ed8f527 [Trevor Grant] [ZEPPELIN-760] Rearranged Ignite Tests b37adbb [Trevor Grant] [ZEPPELIN-760] Dropped redundent Flink Test ff086c7 [Trevor Grant] [ZEPPELIN-760] Dropped duplicate Scalding Test 994312c [Trevor Grant] [ZEPPELIN-658] Companion object bug fix for Flink Interpretter Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/a7a7bdb6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/a7a7bdb6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/a7a7bdb6 Branch: refs/heads/master Commit: a7a7bdb68fc81b366088d93f4852c782c141525b Parents: 42cccd1 Author: Trevor Grant <[email protected]> Authored: Wed Mar 30 08:13:40 2016 -0500 Committer: Felix Cheung <[email protected]> Committed: Fri Apr 1 21:02:41 2016 -0700 ---------------------------------------------------------------------- .../apache/zeppelin/flink/FlinkInterpreter.java | 24 +++++++++++++++++++- .../zeppelin/flink/FlinkInterpreterTest.java | 22 +++++++++++++----- .../zeppelin/scalding/ScaldingInterpreter.java | 23 ++++++++++++++++++- .../scalding/ScaldingInterpreterTest.java | 11 +++++++++ 4 files changed, 72 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a7a7bdb6/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index 8a022bd..5042b96 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -249,12 +249,34 @@ public class FlinkInterpreter extends Interpreter { Code r = null; String incomplete = ""; + boolean inComment = false; + for (int l = 0; l < linesToRun.length; l++) { final String s = linesToRun[l]; // check if next line starts with "." (but not ".." or "./") it is treated as an invocation if (l + 1 < linesToRun.length) { String nextLine = linesToRun[l + 1].trim(); - if (nextLine.startsWith(".") && !nextLine.startsWith("..") && !nextLine.startsWith("./")) { + boolean continuation = false; + if (nextLine.isEmpty() + || nextLine.startsWith("//") // skip empty line or comment + || nextLine.startsWith("}") + || nextLine.startsWith("object")) { // include "} object" for Scala companion object + continuation = true; + } else if (!inComment && nextLine.startsWith("/*")) { + inComment = true; + continuation = true; + } else if (inComment && nextLine.lastIndexOf("*/") >= 0) { + inComment = false; + continuation = true; + } else if (nextLine.length() > 1 + && nextLine.charAt(0) == '.' + && nextLine.charAt(1) != '.' // ".." + && nextLine.charAt(1) != '/') { // "./" + continuation = true; + } else if (inComment) { + continuation = true; + } + if (continuation) { incomplete += s + "\n"; continue; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a7a7bdb6/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index 30c7be7..b6f9db6 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -50,6 +50,22 @@ public class FlinkInterpreterTest { } @Test + public void testNextLineInvocation() { + assertEquals(InterpreterResult.Code.SUCCESS, flink.interpret("\"123\"\n.toInt", context).code()); + } + + @Test + public void testNextLineComments() { + assertEquals(InterpreterResult.Code.SUCCESS, flink.interpret("\"123\"\n/*comment here\n*/.toInt", context).code()); + } + + @Test + public void testNextLineCompanionObject() { + String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter {\n def apply(x: Long) = new Counter()\n}"; + assertEquals(InterpreterResult.Code.SUCCESS, flink.interpret(code, context).code()); + } + + @Test public void testSimpleStatement() { InterpreterResult result = flink.interpret("val a=1", context); result = flink.interpret("print(a)", context); @@ -64,12 +80,6 @@ public class FlinkInterpreterTest { } @Test - public void testNextlineInvoke() { - InterpreterResult result = flink.interpret("\"123\"\n .toInt", context); - assertEquals("res0: Int = 123\n", result.message()); - } - - @Test public void testWordCount() { flink.interpret("val text = env.fromElements(\"To be or not to be\")", context); flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a7a7bdb6/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java ---------------------------------------------------------------------- diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java index d43417e..e808e70 100644 --- a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java +++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java @@ -209,13 +209,34 @@ public class ScaldingInterpreter extends Interpreter { out.reset(); Code r = null; String incomplete = ""; + boolean inComment = false; for (int l = 0; l < linesToRun.length; l++) { String s = linesToRun[l]; // check if next line starts with "." (but not ".." or "./") it is treated as an invocation if (l + 1 < linesToRun.length) { String nextLine = linesToRun[l + 1].trim(); - if (nextLine.startsWith(".") && !nextLine.startsWith("..") && !nextLine.startsWith("./")) { + boolean continuation = false; + if (nextLine.isEmpty() + || nextLine.startsWith("//") // skip empty line or comment + || nextLine.startsWith("}") + || nextLine.startsWith("object")) { // include "} object" for Scala companion object + continuation = true; + } else if (!inComment && nextLine.startsWith("/*")) { + inComment = true; + continuation = true; + } else if (inComment && nextLine.lastIndexOf("*/") >= 0) { + inComment = false; + continuation = true; + } else if (nextLine.length() > 1 + && nextLine.charAt(0) == '.' + && nextLine.charAt(1) != '.' // ".." + && nextLine.charAt(1) != '/') { // "./" + continuation = true; + } else if (inComment) { + continuation = true; + } + if (continuation) { incomplete += s + "\n"; continue; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a7a7bdb6/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java ---------------------------------------------------------------------- diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java index 198fd62..08c67da 100644 --- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java +++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java @@ -89,6 +89,17 @@ public class ScaldingInterpreterTest { } @Test + public void testNextLineComments() { + assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n/*comment here\n*/.toInt", context).code()); + } + + @Test + public void testNextLineCompanionObject() { + String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter {\n def apply(x: Long) = new Counter()\n}"; + assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret(code, context).code()); + } + + @Test public void testBasicIntp() { assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val a = 1\nval b = 2", context).code());
