Repository: eagle Updated Branches: refs/heads/master 84d40ae6f -> eaad6cf74
[EAGLE-1014] add exception handling in CorrelationSpout.java https://issues.apache.org/jira/browse/EAGLE-1014 Author: Zhao, Qingwen <[email protected]> Closes #927 from qingwen220/EAGLE-1014. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/eaad6cf7 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/eaad6cf7 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/eaad6cf7 Branch: refs/heads/master Commit: eaad6cf74c896a97a061a3f600a0ec64a95c0963 Parents: 84d40ae Author: Zhao, Qingwen <[email protected]> Authored: Tue May 2 11:29:46 2017 -0700 Committer: Jay <[email protected]> Committed: Tue May 2 11:29:46 2017 -0700 ---------------------------------------------------------------------- .../eagle/alert/engine/spout/CorrelationSpout.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/eaad6cf7/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java index e9ee892..4338964 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java @@ -172,6 +172,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener wrapper.nextTuple(); } catch (Exception e) { LOG.error("unexpected exception is caught: {}", e.getMessage(), e); + collector.reportError(e); } } @@ -256,9 +257,14 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener LOG.warn(MessageFormat.format("try to create new topic {0}, but found in the active spout list, this may indicate some inconsistency", topic)); continue; } - KafkaSpoutWrapper newWrapper = createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config), - conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds); - newKafkaSpoutList.put(topic, newWrapper); + try { + KafkaSpoutWrapper newWrapper = createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config), + conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds); + newKafkaSpoutList.put(topic, newWrapper); + } catch (Exception e) { + LOG.error("fail to create KafkaSpoutWrapper for topic {} due to {}", topic, e.getMessage(), e); + collector.reportError(e); + } } // iterate remove topics and then close KafkaSpout for (String topic : removeTopics) { @@ -285,6 +291,8 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener this.cachedSpoutSpec = newMeta; this.kafkaSpoutList = newKafkaSpoutList; this.sds = sds; + + LOG.info("after CorrelationSpout reloads, {} kafkaSpouts are generated for {} topics", kafkaSpoutList.size(), topics.size()); } /**
