[FLINK-2761] [scala-shell] Prevent creation of new environment in Scala Shell

This closes #1180


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16afb8ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16afb8ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16afb8ec

Branch: refs/heads/master
Commit: 16afb8ec66a2a07733b9090bffe96af1e913bb63
Parents: 0a8df6d
Author: Sachin Goel <[email protected]>
Authored: Fri Sep 25 13:43:45 2015 +0530
Committer: Stephan Ewen <[email protected]>
Committed: Tue Sep 29 12:24:54 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/ScalaShellRemoteEnvironment.java | 11 +++++++++++
 .../org.apache.flink/api/scala/FlinkILoop.scala     |  1 +
 .../apache/flink/api/scala/ScalaShellITSuite.scala  | 16 +++++++++++++---
 3 files changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/16afb8ec/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
 
b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
index a7dc708..859c686 100644
--- 
a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ 
b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
@@ -84,4 +84,15 @@ public class ScalaShellRemoteEnvironment extends 
RemoteEnvironment {
                
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
                return executor.executePlan(p);
        }
+
+       public void setAsContext() {
+               ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
+                       @Override
+                       public ExecutionEnvironment 
createExecutionEnvironment() {
+                               throw new 
UnsupportedOperationException("Execution Environment is already defined" +
+                                               " for this shell.");
+                       }
+               };
+               initializeContextEnvironment(factory);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/16afb8ec/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
 
b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
index 2797e4b..1e96ba3 100644
--- 
a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
+++ 
b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
@@ -54,6 +54,7 @@ class FlinkILoop(
   // remote environment
   private val remoteEnv: ScalaShellRemoteEnvironment = {
     val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
+    remoteEnv.setAsContext()
     remoteEnv
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/16afb8ec/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 
b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
index e932cd2..7648c50 100644
--- 
a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
+++ 
b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -33,6 +33,19 @@ import scala.tools.nsc.Settings
 @RunWith(classOf[JUnitRunner])
 class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
 
+  test("Prevent re-creation of environment") {
+
+    val input: String =
+      """
+        val env = ExecutionEnvironment.getExecutionEnvironment
+      """.stripMargin
+
+    val output: String = processInShell(input)
+
+    output should include("UnsupportedOperationException: Execution 
Environment is already " +
+      "defined for this shell")
+  }
+
   test("Iteration test with iterative Pi example") {
 
     val input: String =
@@ -224,9 +237,6 @@ class ScalaShellITSuite extends FunSuite with Matchers with 
BeforeAndAfterAll {
       false,
       false)
 
-    val clusterEnvironment = new TestEnvironment(cl, parallelism)
-    clusterEnvironment.setAsContext()
-
     cluster = Some(cl)
   }
 

Reply via email to