Repository: flink
Updated Branches:
  refs/heads/master d1c93d286 -> be055b7a9


[Storm-Compatibility] Forward Storm Kryo registrations to Flink

This closes #1495.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be055b7a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be055b7a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be055b7a

Branch: refs/heads/master
Commit: be055b7a9f8ecb09e5e4e0bbddb98639173a09a7
Parents: d1c93d2
Author: mjsax <mj...@apache.org>
Authored: Sun Jan 10 16:59:37 2016 +0100
Committer: mjsax <mj...@apache.org>
Committed: Thu Jan 14 23:20:18 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/storm/api/FlinkClient.java | 46 +++++++++++++++++---
 .../flink/storm/api/FlinkLocalCluster.java      | 14 +++---
 2 files changed, 45 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/be055b7a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index fa7ae79..2ad7f56 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -31,8 +31,10 @@ import backtype.storm.generated.NotAliveException;
 import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
 
+import com.esotericsoftware.kryo.Serializer;
 import com.google.common.collect.Lists;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
@@ -48,8 +50,10 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.storm.util.StormConfig;
-
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -63,6 +67,7 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link 
Nimbus}{@code .Client} at once, to interact with
@@ -70,6 +75,9 @@ import java.util.Map;
  */
 public class FlinkClient {
 
+       /** The log used by this client. */
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkClient.class);
+
        /** The client's configuration */
        private final Map<?,?> conf;
        /** The jobmanager's host name */
@@ -163,9 +171,8 @@ public class FlinkClient {
         * Parameter {@code uploadedJarLocation} is actually used to point to 
the local jar, because Flink does not support
         * uploading a jar file before hand. Jar files are always uploaded 
directly when a program is submitted.
         */
-       public void submitTopologyWithOpts(final String name, final String 
uploadedJarLocation, final FlinkTopology
-                       topology)
-                                       throws AlreadyAliveException, 
InvalidTopologyException {
+       public void submitTopologyWithOpts(final String name, final String 
uploadedJarLocation, final FlinkTopology topology)
+                       throws AlreadyAliveException, InvalidTopologyException {
 
                if (this.getTopologyJobId(name) != null) {
                        throw new AlreadyAliveException();
@@ -181,9 +188,11 @@ public class FlinkClient {
                        throw new RuntimeException("Problem with jar file " + 
uploadedJarLocation, e);
                }
 
-               /* set storm configuration */
-               if (this.conf != null) {
-                       
topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new 
StormConfig(this.conf));
+               try {
+                       FlinkClient.addStormConfigToTopology(topology, conf);
+               } catch(ClassNotFoundException e) {
+                       LOG.error("Could not register class for Kryo 
serialization.", e);
+                       throw new InvalidTopologyException("Could not register 
class for Kryo serialization.");
                }
 
                final StreamGraph streamGraph = 
topology.getExecutionEnvironment().getStreamGraph();
@@ -325,4 +334,27 @@ public class FlinkClient {
                                actorSystem, 
AkkaUtils.getLookupTimeout(configuration));
        }
 
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       static void addStormConfigToTopology(FlinkTopology topology, Map conf) 
throws ClassNotFoundException {
+               if (conf != null) {
+                       ExecutionConfig flinkConfig = 
topology.getExecutionEnvironment().getConfig();
+
+                       flinkConfig.setGlobalJobParameters(new 
StormConfig(conf));
+
+                       // add all registered types to ExecutionConfig
+                       List<?> registeredClasses = (List<?>) 
conf.get(Config.TOPOLOGY_KRYO_REGISTER);
+                       if (registeredClasses != null) {
+                               for (Object klass : registeredClasses) {
+                                       if (klass instanceof String) {
+                                               
flinkConfig.registerKryoType(Class.forName((String) klass));
+                                       } else {
+                                               for (Entry<String,String> 
register : ((Map<String,String>)klass).entrySet()) {
+                                                       
flinkConfig.registerTypeWithKryoSerializer(Class.forName(register.getKey()),
+                                                                       
(Class<? extends Serializer<?>>)Class.forName(register.getValue()));
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/be055b7a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index 2ce3c0f..04374fd 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -31,7 +31,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,33 +73,32 @@ public class FlinkLocalCluster {
                LOG.info("Running Storm topology on FlinkLocalCluster");
 
                boolean submitBlocking = false;
-               if(conf != null) {
-                       
topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new 
StormConfig(conf));
-
+               if (conf != null) {
                        Object blockingFlag = conf.get(SUBMIT_BLOCKING);
                        if(blockingFlag != null && blockingFlag instanceof 
Boolean) {
                                submitBlocking = 
((Boolean)blockingFlag).booleanValue();
                        }
                }
 
+               FlinkClient.addStormConfigToTopology(topology, conf);
+
                StreamGraph streamGraph = 
topology.getExecutionEnvironment().getStreamGraph();
                streamGraph.setJobName(topologyName);
 
                JobGraph jobGraph = streamGraph.getJobGraph();
 
-               if (flink == null) {
-
+               if (this.flink == null) {
                        Configuration configuration = new Configuration();
                        configuration.addAll(jobGraph.getJobConfiguration());
 
                        
configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
                        
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
jobGraph.getMaximumParallelism());
 
-                       flink = new LocalFlinkMiniCluster(configuration, true);
+                       this.flink = new LocalFlinkMiniCluster(configuration, 
true);
                        this.flink.start();
                }
 
-               if(submitBlocking) {
+               if (submitBlocking) {
                        this.flink.submitJobAndWait(jobGraph, false);
                } else {
                        this.flink.submitJobDetached(jobGraph);

Reply via email to