Repository: eagle
Updated Branches:
  refs/heads/master eaad6cf74 -> 47f00f159


[EAGLE-1015] add an interface to add storm configuration in an application

https://issues.apache.org/jira/browse/EAGLE-1015

Support to add storm config value of type number or string in an application. 
However, to make storm overrides these custom values,  one rule is the 
configuration must start with 'application.storm.'. For example:

`application.storm.workers` to override `workers`
`application.storm.nimbus.host` to override `nimbus.host`

Author: Zhao, Qingwen <[email protected]>

Closes #928 from qingwen220/EAGLE-1015.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/47f00f15
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/47f00f15
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/47f00f15

Branch: refs/heads/master
Commit: 47f00f159231958fb39748b4a6a01c4520371dec
Parents: eaad6cf
Author: Zhao, Qingwen <[email protected]>
Authored: Fri May 5 12:54:21 2017 +0800
Committer: Zhao, Qingwen <[email protected]>
Committed: Fri May 5 12:54:21 2017 +0800

----------------------------------------------------------------------
 .../environment/impl/StormExecutionRuntime.java | 31 +++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/47f00f15/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 2b4180d..8045e43 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -22,6 +22,8 @@ import backtype.storm.generated.*;
 import backtype.storm.utils.NimbusClient;
 import com.google.common.base.Preconditions;
 import com.typesafe.config.ConfigRenderOptions;
+import com.typesafe.config.ConfigValue;
+import org.apache.commons.lang.math.NumberUtils;
 import org.apache.eagle.alert.engine.runner.StormMetricTaggedConsumer;
 import org.apache.eagle.alert.metric.MetricConfigs;
 import org.apache.eagle.app.Application;
@@ -36,6 +38,7 @@ import scala.Int;
 import storm.trident.spout.RichSpoutBatchExecutor;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 public class StormExecutionRuntime implements 
ExecutionRuntime<StormEnvironment,StormTopology> {
@@ -67,14 +70,16 @@ public class StormExecutionRuntime implements 
ExecutionRuntime<StormEnvironment,
         return this.environment;
     }
 
-    public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = 
"topology.message.timeout.secs";
+    private static final String WORKERS = "workers";
+    private static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = 
"topology.message.timeout.secs";
 
     private static final String STORM_NIMBUS_HOST_CONF_PATH = 
"application.storm.nimbusHost";
-    private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
-    private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
     private static final String STORM_NIMBUS_THRIFT_CONF_PATH = 
"application.storm.nimbusThriftPort";
 
-    private static final String WORKERS = "workers";
+    private static final String APP_STORM_CONF_PATH_DEFAULT = 
"application.storm";
+
+    private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
+    private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
 
     private backtype.storm.Config getStormConfig(com.typesafe.config.Config 
config) {
         backtype.storm.Config conf = new backtype.storm.Config();
@@ -85,16 +90,17 @@ public class StormExecutionRuntime implements 
ExecutionRuntime<StormEnvironment,
         conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 
Int.box(16384));
         conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, 
Int.box(20480000));
         String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
+
         if (environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
             nimbusHost = 
environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
-            LOG.info("Overriding {} = 
{}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
+            LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH, 
nimbusHost);
         } else {
-            LOG.info("Using default {} = 
{}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
+            LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH, 
STORM_NIMBUS_HOST_DEFAULT);
         }
         Integer nimbusThriftPort =  STORM_NIMBUS_THRIFT_DEFAULT;
         if (environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
             nimbusThriftPort = 
environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
-            LOG.info("Overriding {} = 
{}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
+            LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH, 
nimbusThriftPort);
         } else {
             LOG.info("Using default {} = 
{}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
         }
@@ -112,6 +118,17 @@ public class StormExecutionRuntime implements 
ExecutionRuntime<StormEnvironment,
         if (config.hasPath(MetricConfigs.METRIC_SINK_CONF)) {
             conf.registerMetricsConsumer(StormMetricTaggedConsumer.class, 
config.root().render(ConfigRenderOptions.concise()), 1);
         }
+
+        if (config.hasPath(APP_STORM_CONF_PATH_DEFAULT)) {
+            com.typesafe.config.Config appStormConf = 
config.getConfig(APP_STORM_CONF_PATH_DEFAULT);
+            for (Map.Entry<String, ConfigValue> entry: 
appStormConf.entrySet()) {
+                if 
(NumberUtils.isNumber(entry.getValue().unwrapped().toString())) {
+                    conf.put(entry.getKey(), 
appStormConf.getNumber(entry.getKey()));
+                } else {
+                    conf.put(entry.getKey(), entry.getValue().unwrapped());
+                }
+            }
+        }
         return conf;
     }
 

Reply via email to