Repository: incubator-eagle
Updated Branches:
  refs/heads/master 4df63def8 -> 9a5cdb26d


EAGLE-758: add switch to enable and disable event log

Author: Li, Garrett
Reviewer: ralphsu

This closes #633


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9a5cdb26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9a5cdb26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9a5cdb26

Branch: refs/heads/master
Commit: 9a5cdb26d6db15e05a21cc1d6c70e94aded181d7
Parents: 4df63de
Author: Xiancheng Li <xiancheng...@ebay.com>
Authored: Wed Nov 9 16:59:28 2016 +0800
Committer: Ralph, Su <suliang...@gmail.com>
Committed: Wed Nov 9 19:10:24 2016 +0800

----------------------------------------------------------------------
 .../eagle/alert/engine/runner/AlertBolt.java       | 13 ++++++++++---
 .../eagle/alert/engine/spout/CorrelationSpout.java | 17 ++++++++++-------
 .../engine/spout/SpoutOutputCollectorWrapper.java  |  8 ++++++--
 3 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9a5cdb26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index bdd9a99..639d338 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -63,6 +63,7 @@ public class AlertBolt extends AbstractStreamBolt implements 
AlertBoltSpecListen
     private PolicyGroupEvaluator policyGroupEvaluator;
     private AlertStreamCollector alertOutputCollector;
     private String boltId;
+    private boolean logEventEnabled;
     private volatile Object outputLock;
     // mapping from policy name to PolicyDefinition
     private volatile Map<String, PolicyDefinition> cachedPolicies = new 
HashMap<>(); // for one streamGroup, there are multiple policies
@@ -74,6 +75,10 @@ public class AlertBolt extends AbstractStreamBolt implements 
AlertBoltSpecListen
         this.boltId = boltId;
         this.policyGroupEvaluator = new PolicyGroupEvaluatorImpl(boltId + 
"-evaluator_stage1"); // use bolt id as evaluatorId.
         // TODO next stage evaluator
+
+        if (config.hasPath("topology.logEventEnabled")) {
+            logEventEnabled = config.getBoolean("topology.logEventEnabled");
+        }
     }
 
     @Override
@@ -81,7 +86,9 @@ public class AlertBolt extends AbstractStreamBolt implements 
AlertBoltSpecListen
         this.streamContext.counter().scope("execute_count").incr();
         try {
             PartitionedEvent pe = 
deserialize(input.getValueByField(AlertConstants.FIELD_0));
-            LOG.info("Alert bolt {} received event: {}", boltId, 
pe.getEvent());
+            if (logEventEnabled) {
+                LOG.info("Alert bolt {} received event: {}", boltId, 
pe.getEvent());
+            }
             String streamEventVersion = pe.getEvent().getMetaVersion();
 
             if (streamEventVersion == null) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9a5cdb26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index 67074ce..63e94ca 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -48,7 +48,6 @@ import java.util.*;
 
 /**
  * wrap KafkaSpout to provide parallel processing of messages for multiple 
Kafka topics
- *
  * <p>1. onNewConfig() is interface for outside to update new metadata. Upon 
new metadata, this class will calculate if there is any new topic, removed 
topic or
  * updated topic</p>
  */
@@ -183,7 +182,7 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
         // decode and get topic
         KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
         KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
-        if (spout !=  null) {
+        if (spout != null) {
             spout.ack(id.id);
         }
     }
@@ -194,7 +193,7 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
         KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
         LOG.error("Failing message {}, with topic {}", msgId, id.topic);
         KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
-        if (spout !=  null) {
+        if (spout != null) {
             spout.fail(id.id);
         }
     }
@@ -287,8 +286,8 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
      * consumerId by default is EagleConsumer unless it is specified by 
"stormKafkaEagleConsumer"
      * Note2: put topologyId as part of zkState because one topic by design 
can be consumed by multiple topologies so one topology needs to know
      * processed offset for itself
-     *
      * <p>TODO: Should avoid use Config.get in deep calling stack, should 
generate config bean as early as possible
+     * </p>
      *
      * @param conf
      * @param context
@@ -311,6 +310,10 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
         if (config.hasPath("spout.stormKafkaTransactionZkPath")) {
             transactionZkRoot = 
config.getString("spout.stormKafkaTransactionZkPath");
         }
+        boolean logEventEnabled = false;
+        if (config.hasPath("topology.logEventEnabled")) {
+            logEventEnabled = config.getBoolean("topology.logEventEnabled");
+        }
         // write partition offset etc. into zkRoot+id, see 
PartitionManager.committedPath
         String zkStateTransactionRelPath = 
DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH;
         if (config.hasPath("spout.stormKafkaEagleConsumer")) {
@@ -339,7 +342,7 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
 
         spoutConfig.scheme = createMultiScheme(conf, topic, schemeClsName);
         KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, 
kafkaSpoutMetric);
-        SpoutOutputCollectorWrapper collectorWrapper = new 
SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, 
numOfRouterBolts, sds, this.serializer);
+        SpoutOutputCollectorWrapper collectorWrapper = new 
SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, 
numOfRouterBolts, sds, this.serializer, logEventEnabled);
         wrapper.open(conf, context, collectorWrapper);
 
         if (LOG.isInfoEnabled()) {
@@ -352,8 +355,8 @@ public class CorrelationSpout extends BaseRichSpout 
implements SpoutSpecListener
         Object scheme = SchemeBuilder.buildFromClsName(schemeClsName, topic, 
conf);
         if (scheme instanceof MultiScheme) {
             return (MultiScheme) scheme;
-        } else if (scheme instanceof  Scheme) {
-            return new SchemeAsMultiScheme((Scheme)scheme);
+        } else if (scheme instanceof Scheme) {
+            return new SchemeAsMultiScheme((Scheme) scheme);
         } else {
             LOG.error("create spout scheme failed.");
             throw new IllegalArgumentException("create spout scheme failed.");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9a5cdb26/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
index b75814e..1036a36 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
@@ -50,6 +50,7 @@ public class SpoutOutputCollectorWrapper extends 
SpoutOutputCollector implements
     private final String topic;
     private final PartitionedEventSerializer serializer;
     private int numOfRouterBolts;
+    private boolean logEventEnabled;
 
     private volatile List<StreamRepartitionMetadata> 
streamRepartitionMetadataList;
     private volatile Tuple2StreamConverter converter;
@@ -67,7 +68,7 @@ public class SpoutOutputCollectorWrapper extends 
SpoutOutputCollector implements
                                        String topic,
                                        SpoutSpec spoutSpec,
                                        int numGroupbyBolts,
-                                       Map<String, StreamDefinition> sds, 
PartitionedEventSerializer serializer) {
+                                       Map<String, StreamDefinition> sds, 
PartitionedEventSerializer serializer, boolean logEventEnabled) {
         super(delegate);
         this.spout = spout;
         this.delegate = delegate;
@@ -77,6 +78,7 @@ public class SpoutOutputCollectorWrapper extends 
SpoutOutputCollector implements
         this.numOfRouterBolts = numGroupbyBolts;
         this.sds = sds;
         this.serializer = serializer;
+        this.logEventEnabled = logEventEnabled;
     }
 
     /**
@@ -118,7 +120,9 @@ public class SpoutOutputCollectorWrapper extends 
SpoutOutputCollector implements
         }
 
         StreamEvent event = convertToStreamEventByStreamDefinition((Long) 
convertedTuple.get(2), m, sds.get(streamId));
-        LOG.info("Spout from topic {} emit event: {}", topic, event);
+        if (logEventEnabled) {
+            LOG.info("Spout from topic {} emit event: {}", topic, event);
+        }
         
         /*
             phase 2: stream repartition

Reply via email to