[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14733429#comment-14733429
 ] 

ASF GitHub Bot commented on FLINK-2525:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38844529
  
    --- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
 ---
    @@ -107,4 +112,40 @@ public static TopologyContext 
convertToTopologyContext(final StreamingRuntimeCon
                return new FlinkTopologyContext(new StormTopology(spoutSpecs, 
bolts, null), taskToComponents, taskId);
        }
     
    +   /**
    +    * Get storm configuration from StreamingRuntimeContext.
    +    *
    +    * @param ctx
    +    *            The RuntimeContext of operator.
    +    * @return The storm configuration map.
    +    * @throws Exception
    +    */
    +   public static Map getStormConfFromContext(final RuntimeContext ctx)
    +                   throws Exception {
    +           Map stormConf = null;
    +           if (ctx instanceof StreamingRuntimeContext) {
    +                   Configuration jobConfiguration = 
((StreamingRuntimeContext) ctx).getJobConfiguration();
    +
    +                   if (jobConfiguration != null) {
    +                           /* topologies mode */
    +                           stormConf = (Map) 
InstantiationUtil.readObjectFromConfig(jobConfiguration, 
StormConfig.STORM_DEFAULT_CONFIG, Map.class.getClassLoader());
    --- End diff --
    
    Since the map is untyped, it might happen that users pass arbitrary 
objects, containing classes from the user code into the Map.
    This would lead to class not found exceptions when running the code on 
clusters. Can you use the classloader of `StormWrapperSetupHelper´ ?


> Add configuration support in Storm-compatibility
> ------------------------------------------------
>
>                 Key: FLINK-2525
>                 URL: https://issues.apache.org/jira/browse/FLINK-2525
>             Project: Flink
>          Issue Type: New Feature
>          Components: Storm Compatibility
>            Reporter: fangfengbin
>            Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to