Repository: incubator-flink
Updated Branches:
  refs/heads/master 227e40fe1 -> 33964003c


[streaming] Streaming local execution fix to create proper jobclient for 
minicluster


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/33964003
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/33964003
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/33964003

Branch: refs/heads/master
Commit: 33964003c7f85d0884edac8150628907a327edc1
Parents: 227e40f
Author: Gyula Fora <[email protected]>
Authored: Fri Dec 19 08:46:26 2014 +0100
Committer: Gyula Fora <[email protected]>
Committed: Fri Dec 19 08:46:26 2014 +0100

----------------------------------------------------------------------
 .../flink/streaming/util/ClusterUtil.java       | 29 ++++++++------------
 1 file changed, 11 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/33964003/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 8fb6554..4362510 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -17,17 +17,16 @@
 
 package org.apache.flink.streaming.util;
 
-import java.net.InetSocketAddress;
-
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import akka.actor.ActorRef;
+
 public class ClusterUtil {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ClusterUtil.class);
@@ -43,8 +42,8 @@ public class ClusterUtil {
         * @param memorySize
         *            memorySize
         */
-       public static void runOnMiniCluster(JobGraph jobGraph, int 
degreeOfParallelism,
-                                                                               
long memorySize) throws Exception  {
+       public static void runOnMiniCluster(JobGraph jobGraph, int 
degreeOfParallelism, long memorySize)
+                       throws Exception {
 
                Configuration configuration = jobGraph.getJobConfiguration();
 
@@ -58,22 +57,16 @@ public class ClusterUtil {
 
                try {
                        exec = new LocalFlinkMiniCluster(configuration, true);
+                       ActorRef jobClient = exec.getJobClient();
+
+                       JobClient.submitJobAndWait(jobGraph, false, jobClient, 
exec.timeout());
 
-                       Client client = new Client(new 
InetSocketAddress("localhost", exec.getJobManagerRPCPort()),
-                                       configuration, 
ClusterUtil.class.getClassLoader());
-                       client.run(jobGraph, true);
-               } catch (ProgramInvocationException e) {
-                       if 
(e.getMessage().contains("GraphConversionException")) {
-                               throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, 
e);
-                       } else {
-                               throw e;
-                       }
                } catch (Exception e) {
                        throw e;
                } finally {
-                               if(exec != null) {
-                                       exec.stop();
-                               }
+                       if (exec != null) {
+                               exec.stop();
+                       }
                }
        }
 

Reply via email to