Repository: incubator-eagle Updated Branches: refs/heads/master 56e7048ff -> e520e4011
[EAGLE-673] add numOfPublishExecutors to alert engine topology Author: wujinhu <wujinhu...@126.com> Closes #555 from wujinhu/EAGLE-673. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/e520e401 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/e520e401 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/e520e401 Branch: refs/heads/master Commit: e520e4011796ebdea52f80ca43b9bffddf3aa50a Parents: 56e7048 Author: wujinhu <wujinhu...@126.com> Authored: Mon Oct 24 14:09:43 2016 +0800 Committer: wujinhu <wujinhu...@126.com> Committed: Mon Oct 24 14:09:43 2016 +0800 ---------------------------------------------------------------------- ...agle.alert.app.AlertUnitTopologyAppProvider.xml | 7 +++++++ .../src/main/resources/application.conf | 1 + .../alert/engine/runner/UnitTopologyRunner.java | 17 ++++++++++++----- 3 files changed, 20 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e520e401/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml index 6ef96c7..28f7db4 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml +++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml @@ -51,6 +51,13 @@ <required>false</required> </property> <property> + <name>topology.numOfPublishExecutors</name> + <displayName>Publisher Executor Number</displayName> + <value>1</value> + <description>Number of publish executors</description> + <required>false</required> + </property> + <property> <name>topology.numOfPublishTasks</name> <displayName>Publisher Tasks Number</displayName> <value>1</value> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e520e401/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf index 1a25cfa..46f5b08 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf +++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf @@ -22,6 +22,7 @@ "numOfSpoutTasks" : 1, "numOfRouterBolts" : 4, "numOfAlertBolts" : 10, + "numOfPublishExecutors" : 1, "numOfPublishTasks" : 1, "messageTimeoutSecs": 3600, "localMode" : "true" http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e520e401/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java index 88cfb9b..287d5db 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java @@ -63,6 +63,7 @@ public class UnitTopologyRunner { public static final String ROUTER_TASK_NUM = "topology.numOfRouterBolts"; public static final String ALERT_TASK_NUM = "topology.numOfAlertBolts"; public static final String PUBLISH_TASK_NUM = "topology.numOfPublishTasks"; + public static final String PUBLISH_EXECUTOR_NUM = "topology.numOfPublishExecutors"; public static final String LOCAL_MODE = "topology.localMode"; public static final String MESSAGE_TIMEOUT_SECS = "topology.messageTimeoutSecs"; public static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600; @@ -88,6 +89,7 @@ public class UnitTopologyRunner { int numOfSpoutTasks, int numOfRouterBolts, int numOfAlertBolts, + int numOfPublishExecutors, int numOfPublishTasks, Config config, boolean localMode) { @@ -104,7 +106,7 @@ public class UnitTopologyRunner { } stormConfig.setNumWorkers(numOfTotalWorkers); - StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config); + StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config); if (localMode) { LOG.info("Submitting as local mode"); @@ -126,10 +128,11 @@ public class UnitTopologyRunner { int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); int numOfAlertBolts = config.getInt(ALERT_TASK_NUM); + int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM); int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM); boolean localMode = config.getBoolean(LOCAL_MODE); int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM); - run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config, localMode); + run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config, localMode); } public IMetadataChangeNotifyService getMetadataChangeNotifyService() { @@ -144,6 +147,7 @@ public class UnitTopologyRunner { int numOfSpoutTasks, int numOfRouterBolts, int numOfAlertBolts, + int numOfPublishExecutors, int numOfPublishTasks, Config config) { StreamRouterBolt[] routerBolts = new StreamRouterBolt[numOfRouterBolts]; @@ -199,7 +203,7 @@ public class UnitTopologyRunner { } // connect alert bolt and alert publish bolt, this is the last bolt in the pipeline - BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, publisherBolt).setNumTasks(numOfPublishTasks); + BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, publisherBolt, numOfPublishExecutors).setNumTasks(numOfPublishTasks); for (int i = 0; i < numOfAlertBolts; i++) { boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new Fields(AlertConstants.FIELD_0)); } @@ -211,9 +215,10 @@ public class UnitTopologyRunner { int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); int numOfAlertBolts = config.getInt(ALERT_TASK_NUM); + int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM); int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM); - return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config); + return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config); } // --------------------------- @@ -224,15 +229,17 @@ public class UnitTopologyRunner { int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); int numOfAlertBolts = config.getInt(ALERT_TASK_NUM); + int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM); int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM); - return buildTopologyMetadata(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config); + return buildTopologyMetadata(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config); } public static Topology buildTopologyMetadata(String topologyId, int numOfSpoutTasks, int numOfRouterBolts, int numOfAlertBolts, + int numOfPublishExecutors, int numOfPublishTasks, Config config) { Topology topology = new Topology();