Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 42cccd157 -> a7a7bdb68


[ZEPPELIN-760] Companion object bug fix for Flink Interpretter

### What is this PR for?
This implements the solution presented for the Spark interpreter in PR-780 for 
the Flink, Scalding, ~~and Ignite~~ interpreters.

### What type of PR is it?
Improvement

### Todos
* [x] - Fix line splitting
* [x] - Add tests

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-658
* https://issues.apache.org/jira/browse/ZEPPELIN-747

### How should this be tested?
Add a new paragraph with each interpreter and define a class with its companion 
object.
See bug ZEPPELIN-658 for more details

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Trevor Grant <[email protected]>

Closes #794 from rawkintrevo/flink-companion-objects and squashes the following 
commits:

9b90890 [Trevor Grant] [ZEPPELIN-760] Dropped Ignite Fix/Tests
25640eb [Trevor Grant] [ZEPPELIN-760] Rearranged Ignite Tests
ed8f527 [Trevor Grant] [ZEPPELIN-760] Rearranged Ignite Tests
b37adbb [Trevor Grant] [ZEPPELIN-760] Dropped redundent Flink Test
ff086c7 [Trevor Grant] [ZEPPELIN-760] Dropped duplicate Scalding Test
994312c [Trevor Grant] [ZEPPELIN-658] Companion object bug fix for Flink 
Interpretter


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/a7a7bdb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/a7a7bdb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/a7a7bdb6

Branch: refs/heads/master
Commit: a7a7bdb68fc81b366088d93f4852c782c141525b
Parents: 42cccd1
Author: Trevor Grant <[email protected]>
Authored: Wed Mar 30 08:13:40 2016 -0500
Committer: Felix Cheung <[email protected]>
Committed: Fri Apr 1 21:02:41 2016 -0700

----------------------------------------------------------------------
 .../apache/zeppelin/flink/FlinkInterpreter.java | 24 +++++++++++++++++++-
 .../zeppelin/flink/FlinkInterpreterTest.java    | 22 +++++++++++++-----
 .../zeppelin/scalding/ScaldingInterpreter.java  | 23 ++++++++++++++++++-
 .../scalding/ScaldingInterpreterTest.java       | 11 +++++++++
 4 files changed, 72 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a7a7bdb6/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java 
b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index 8a022bd..5042b96 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -249,12 +249,34 @@ public class FlinkInterpreter extends Interpreter {
     Code r = null;
 
     String incomplete = "";
+    boolean inComment = false;
+
     for (int l = 0; l < linesToRun.length; l++) {
       final String s = linesToRun[l];
       // check if next line starts with "." (but not ".." or "./") it is 
treated as an invocation
       if (l + 1 < linesToRun.length) {
         String nextLine = linesToRun[l + 1].trim();
-        if (nextLine.startsWith(".") && !nextLine.startsWith("..") && 
!nextLine.startsWith("./")) {
+        boolean continuation = false;
+        if (nextLine.isEmpty()
+                || nextLine.startsWith("//")         // skip empty line or 
comment
+                || nextLine.startsWith("}")
+                || nextLine.startsWith("object")) { // include "} object" for 
Scala companion object
+          continuation = true;
+        } else if (!inComment && nextLine.startsWith("/*")) {
+          inComment = true;
+          continuation = true;
+        } else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
+          inComment = false;
+          continuation = true;
+        } else if (nextLine.length() > 1
+                && nextLine.charAt(0) == '.'
+                && nextLine.charAt(1) != '.'     // ".."
+                && nextLine.charAt(1) != '/') {  // "./"
+          continuation = true;
+        } else if (inComment) {
+          continuation = true;
+        }
+        if (continuation) {
           incomplete += s + "\n";
           continue;
         }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a7a7bdb6/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java 
b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 30c7be7..b6f9db6 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -50,6 +50,22 @@ public class FlinkInterpreterTest {
   }
 
   @Test
+  public void testNextLineInvocation() {
+    assertEquals(InterpreterResult.Code.SUCCESS, 
flink.interpret("\"123\"\n.toInt", context).code());
+  }
+
+  @Test
+  public void testNextLineComments() {
+    assertEquals(InterpreterResult.Code.SUCCESS, 
flink.interpret("\"123\"\n/*comment here\n*/.toInt", context).code());
+  }
+
+  @Test
+  public void testNextLineCompanionObject() {
+    String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n 
object Counter {\n def apply(x: Long) = new Counter()\n}";
+    assertEquals(InterpreterResult.Code.SUCCESS, flink.interpret(code, 
context).code());
+  }
+
+  @Test
   public void testSimpleStatement() {
     InterpreterResult result = flink.interpret("val a=1", context);
     result = flink.interpret("print(a)", context);
@@ -64,12 +80,6 @@ public class FlinkInterpreterTest {
   }
 
   @Test
-  public void testNextlineInvoke() {
-    InterpreterResult result = flink.interpret("\"123\"\n  .toInt", context);
-    assertEquals("res0: Int = 123\n", result.message());    
-  }
-
-  @Test
   public void testWordCount() {
     flink.interpret("val text = env.fromElements(\"To be or not to be\")", 
context);
     flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") 
}.map { (_, 1) }.groupBy(0).sum(1)", context);

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a7a7bdb6/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
----------------------------------------------------------------------
diff --git 
a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java 
b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
index d43417e..e808e70 100644
--- 
a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
+++ 
b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
@@ -209,13 +209,34 @@ public class ScaldingInterpreter extends Interpreter {
     out.reset();
     Code r = null;
     String incomplete = "";
+    boolean inComment = false;
 
     for (int l = 0; l < linesToRun.length; l++) {
       String s = linesToRun[l];
       // check if next line starts with "." (but not ".." or "./") it is 
treated as an invocation
       if (l + 1 < linesToRun.length) {
         String nextLine = linesToRun[l + 1].trim();
-        if (nextLine.startsWith(".") && !nextLine.startsWith("..") && 
!nextLine.startsWith("./")) {
+        boolean continuation = false;
+        if (nextLine.isEmpty()
+                || nextLine.startsWith("//")         // skip empty line or 
comment
+                || nextLine.startsWith("}")
+                || nextLine.startsWith("object")) { // include "} object" for 
Scala companion object
+          continuation = true;
+        } else if (!inComment && nextLine.startsWith("/*")) {
+          inComment = true;
+          continuation = true;
+        } else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
+          inComment = false;
+          continuation = true;
+        } else if (nextLine.length() > 1
+                && nextLine.charAt(0) == '.'
+                && nextLine.charAt(1) != '.'     // ".."
+                && nextLine.charAt(1) != '/') {  // "./"
+          continuation = true;
+        } else if (inComment) {
+          continuation = true;
+        }
+        if (continuation) {
           incomplete += s + "\n";
           continue;
         }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a7a7bdb6/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
 
b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
index 198fd62..08c67da 100644
--- 
a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
+++ 
b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
@@ -89,6 +89,17 @@ public class ScaldingInterpreterTest {
   }
 
   @Test
+  public void testNextLineComments() {
+    assertEquals(InterpreterResult.Code.SUCCESS, 
repl.interpret("\"123\"\n/*comment here\n*/.toInt", context).code());
+  }
+
+  @Test
+  public void testNextLineCompanionObject() {
+    String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n 
object Counter {\n def apply(x: Long) = new Counter()\n}";
+    assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret(code, 
context).code());
+  }
+
+  @Test
   public void testBasicIntp() {
     assertEquals(InterpreterResult.Code.SUCCESS,
         repl.interpret("val a = 1\nval b = 2", context).code());

Reply via email to