Repository: incubator-flink Updated Branches: refs/heads/master 227e40fe1 -> 33964003c
[streaming] Streaming local execution fix to create proper jobclient for minicluster Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/33964003 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/33964003 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/33964003 Branch: refs/heads/master Commit: 33964003c7f85d0884edac8150628907a327edc1 Parents: 227e40f Author: Gyula Fora <[email protected]> Authored: Fri Dec 19 08:46:26 2014 +0100 Committer: Gyula Fora <[email protected]> Committed: Fri Dec 19 08:46:26 2014 +0100 ---------------------------------------------------------------------- .../flink/streaming/util/ClusterUtil.java | 29 ++++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/33964003/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java index 8fb6554..4362510 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java @@ -17,17 +17,16 @@ package org.apache.flink.streaming.util; -import java.net.InetSocketAddress; - -import org.apache.flink.client.program.Client; -import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import akka.actor.ActorRef; + public class ClusterUtil { private static final Logger LOG = LoggerFactory.getLogger(ClusterUtil.class); @@ -43,8 +42,8 @@ public class ClusterUtil { * @param memorySize * memorySize */ - public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfParallelism, - long memorySize) throws Exception { + public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfParallelism, long memorySize) + throws Exception { Configuration configuration = jobGraph.getJobConfiguration(); @@ -58,22 +57,16 @@ public class ClusterUtil { try { exec = new LocalFlinkMiniCluster(configuration, true); + ActorRef jobClient = exec.getJobClient(); + + JobClient.submitJobAndWait(jobGraph, false, jobClient, exec.timeout()); - Client client = new Client(new InetSocketAddress("localhost", exec.getJobManagerRPCPort()), - configuration, ClusterUtil.class.getClassLoader()); - client.run(jobGraph, true); - } catch (ProgramInvocationException e) { - if (e.getMessage().contains("GraphConversionException")) { - throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e); - } else { - throw e; - } } catch (Exception e) { throw e; } finally { - if(exec != null) { - exec.stop(); - } + if (exec != null) { + exec.stop(); + } } }
