Repository: incubator-eagle
Updated Branches:
  refs/heads/master 56e7048ff -> e520e4011


[EAGLE-673] add numOfPublishExecutors to alert engine topology

Author: wujinhu <wujinhu...@126.com>

Closes #555 from wujinhu/EAGLE-673.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/e520e401
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/e520e401
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/e520e401

Branch: refs/heads/master
Commit: e520e4011796ebdea52f80ca43b9bffddf3aa50a
Parents: 56e7048
Author: wujinhu <wujinhu...@126.com>
Authored: Mon Oct 24 14:09:43 2016 +0800
Committer: wujinhu <wujinhu...@126.com>
Committed: Mon Oct 24 14:09:43 2016 +0800

----------------------------------------------------------------------
 ...agle.alert.app.AlertUnitTopologyAppProvider.xml |  7 +++++++
 .../src/main/resources/application.conf            |  1 +
 .../alert/engine/runner/UnitTopologyRunner.java    | 17 ++++++++++++-----
 3 files changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e520e401/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
 
b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
index 6ef96c7..28f7db4 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
@@ -51,6 +51,13 @@
             <required>false</required>
         </property>
         <property>
+            <name>topology.numOfPublishExecutors</name>
+            <displayName>Publisher Executor Number</displayName>
+            <value>1</value>
+            <description>Number of publish executors</description>
+            <required>false</required>
+        </property>
+        <property>
             <name>topology.numOfPublishTasks</name>
             <displayName>Publisher Tasks Number</displayName>
             <value>1</value>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e520e401/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf
 
b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf
index 1a25cfa..46f5b08 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf
@@ -22,6 +22,7 @@
     "numOfSpoutTasks" : 1,
     "numOfRouterBolts" : 4,
     "numOfAlertBolts" : 10,
+    "numOfPublishExecutors" : 1,
     "numOfPublishTasks" : 1,
     "messageTimeoutSecs": 3600,
     "localMode" : "true"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e520e401/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
index 88cfb9b..287d5db 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
@@ -63,6 +63,7 @@ public class UnitTopologyRunner {
     public static final String ROUTER_TASK_NUM = "topology.numOfRouterBolts";
     public static final String ALERT_TASK_NUM = "topology.numOfAlertBolts";
     public static final String PUBLISH_TASK_NUM = "topology.numOfPublishTasks";
+    public static final String PUBLISH_EXECUTOR_NUM = 
"topology.numOfPublishExecutors";
     public static final String LOCAL_MODE = "topology.localMode";
     public static final String MESSAGE_TIMEOUT_SECS = 
"topology.messageTimeoutSecs";
     public static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600;
@@ -88,6 +89,7 @@ public class UnitTopologyRunner {
                      int numOfSpoutTasks,
                      int numOfRouterBolts,
                      int numOfAlertBolts,
+                     int numOfPublishExecutors,
                      int numOfPublishTasks,
                      Config config,
                      boolean localMode) {
@@ -104,7 +106,7 @@ public class UnitTopologyRunner {
         }
 
         stormConfig.setNumWorkers(numOfTotalWorkers);
-        StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, 
numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
+        StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, 
numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, 
config);
 
         if (localMode) {
             LOG.info("Submitting as local mode");
@@ -126,10 +128,11 @@ public class UnitTopologyRunner {
         int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
         int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
         int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
+        int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM);
         int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
         boolean localMode = config.getBoolean(LOCAL_MODE);
         int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
-        run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, 
numOfAlertBolts, numOfPublishTasks, config, localMode);
+        run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, 
numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config, localMode);
     }
 
     public IMetadataChangeNotifyService getMetadataChangeNotifyService() {
@@ -144,6 +147,7 @@ public class UnitTopologyRunner {
                                        int numOfSpoutTasks,
                                        int numOfRouterBolts,
                                        int numOfAlertBolts,
+                                       int numOfPublishExecutors,
                                        int numOfPublishTasks,
                                        Config config) {
         StreamRouterBolt[] routerBolts = new 
StreamRouterBolt[numOfRouterBolts];
@@ -199,7 +203,7 @@ public class UnitTopologyRunner {
         }
 
         // connect alert bolt and alert publish bolt, this is the last bolt in 
the pipeline
-        BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, 
publisherBolt).setNumTasks(numOfPublishTasks);
+        BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, 
publisherBolt, numOfPublishExecutors).setNumTasks(numOfPublishTasks);
         for (int i = 0; i < numOfAlertBolts; i++) {
             boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new 
Fields(AlertConstants.FIELD_0));
         }
@@ -211,9 +215,10 @@ public class UnitTopologyRunner {
         int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
         int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
         int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
+        int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM);
         int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
 
-        return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, 
numOfAlertBolts, numOfPublishTasks, config);
+        return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, 
numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config);
     }
 
     // ---------------------------
@@ -224,15 +229,17 @@ public class UnitTopologyRunner {
         int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
         int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
         int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
+        int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM);
         int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
 
-        return buildTopologyMetadata(topologyId, numOfSpoutTasks, 
numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
+        return buildTopologyMetadata(topologyId, numOfSpoutTasks, 
numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, 
config);
     }
 
     public static Topology buildTopologyMetadata(String topologyId,
                                                  int numOfSpoutTasks,
                                                  int numOfRouterBolts,
                                                  int numOfAlertBolts,
+                                                 int numOfPublishExecutors,
                                                  int numOfPublishTasks,
                                                  Config config) {
         Topology topology = new Topology();

Reply via email to