Repository: flink Updated Branches: refs/heads/master d1c93d286 -> be055b7a9
[Storm-Compatibility] Forward Storm Kryo registrations to Flink This closes #1495. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be055b7a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be055b7a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be055b7a Branch: refs/heads/master Commit: be055b7a9f8ecb09e5e4e0bbddb98639173a09a7 Parents: d1c93d2 Author: mjsax <mj...@apache.org> Authored: Sun Jan 10 16:59:37 2016 +0100 Committer: mjsax <mj...@apache.org> Committed: Thu Jan 14 23:20:18 2016 +0100 ---------------------------------------------------------------------- .../org/apache/flink/storm/api/FlinkClient.java | 46 +++++++++++++++++--- .../flink/storm/api/FlinkLocalCluster.java | 14 +++--- 2 files changed, 45 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/be055b7a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java index fa7ae79..2ad7f56 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java @@ -31,8 +31,10 @@ import backtype.storm.generated.NotAliveException; import backtype.storm.utils.NimbusClient; import backtype.storm.utils.Utils; +import com.esotericsoftware.kryo.Serializer; import com.google.common.collect.Lists; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.client.program.Client; import org.apache.flink.client.program.JobWithJars; @@ -48,8 +50,10 @@ import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus; import org.apache.flink.storm.util.StormConfig; - import org.apache.flink.streaming.api.graph.StreamGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import scala.Some; import scala.concurrent.Await; import scala.concurrent.Future; @@ -63,6 +67,7 @@ import java.net.URL; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Map.Entry; /** * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client} at once, to interact with @@ -70,6 +75,9 @@ import java.util.Map; */ public class FlinkClient { + /** The log used by this client. */ + private static final Logger LOG = LoggerFactory.getLogger(FlinkClient.class); + /** The client's configuration */ private final Map<?,?> conf; /** The jobmanager's host name */ @@ -163,9 +171,8 @@ public class FlinkClient { * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted. */ - public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology - topology) - throws AlreadyAliveException, InvalidTopologyException { + public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology topology) + throws AlreadyAliveException, InvalidTopologyException { if (this.getTopologyJobId(name) != null) { throw new AlreadyAliveException(); @@ -181,9 +188,11 @@ public class FlinkClient { throw new RuntimeException("Problem with jar file " + uploadedJarLocation, e); } - /* set storm configuration */ - if (this.conf != null) { - topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new StormConfig(this.conf)); + try { + FlinkClient.addStormConfigToTopology(topology, conf); + } catch(ClassNotFoundException e) { + LOG.error("Could not register class for Kryo serialization.", e); + throw new InvalidTopologyException("Could not register class for Kryo serialization."); } final StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph(); @@ -325,4 +334,27 @@ public class FlinkClient { actorSystem, AkkaUtils.getLookupTimeout(configuration)); } + @SuppressWarnings({ "unchecked", "rawtypes" }) + static void addStormConfigToTopology(FlinkTopology topology, Map conf) throws ClassNotFoundException { + if (conf != null) { + ExecutionConfig flinkConfig = topology.getExecutionEnvironment().getConfig(); + + flinkConfig.setGlobalJobParameters(new StormConfig(conf)); + + // add all registered types to ExecutionConfig + List<?> registeredClasses = (List<?>) conf.get(Config.TOPOLOGY_KRYO_REGISTER); + if (registeredClasses != null) { + for (Object klass : registeredClasses) { + if (klass instanceof String) { + flinkConfig.registerKryoType(Class.forName((String) klass)); + } else { + for (Entry<String,String> register : ((Map<String,String>)klass).entrySet()) { + flinkConfig.registerTypeWithKryoSerializer(Class.forName(register.getKey()), + (Class<? extends Serializer<?>>)Class.forName(register.getValue())); + } + } + } + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/be055b7a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java index 2ce3c0f..04374fd 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java @@ -31,7 +31,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.minicluster.FlinkMiniCluster; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.apache.flink.storm.util.StormConfig; import org.apache.flink.streaming.api.graph.StreamGraph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,33 +73,32 @@ public class FlinkLocalCluster { LOG.info("Running Storm topology on FlinkLocalCluster"); boolean submitBlocking = false; - if(conf != null) { - topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new StormConfig(conf)); - + if (conf != null) { Object blockingFlag = conf.get(SUBMIT_BLOCKING); if(blockingFlag != null && blockingFlag instanceof Boolean) { submitBlocking = ((Boolean)blockingFlag).booleanValue(); } } + FlinkClient.addStormConfigToTopology(topology, conf); + StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph(); streamGraph.setJobName(topologyName); JobGraph jobGraph = streamGraph.getJobGraph(); - if (flink == null) { - + if (this.flink == null) { Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); - flink = new LocalFlinkMiniCluster(configuration, true); + this.flink = new LocalFlinkMiniCluster(configuration, true); this.flink.start(); } - if(submitBlocking) { + if (submitBlocking) { this.flink.submitJobAndWait(jobGraph, false); } else { this.flink.submitJobDetached(jobGraph);