METRON-1775 Transient exception could prevent expired profiles from being 
flushed (nickwallen) closes apache/metron#1194


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/2720e537
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/2720e537
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/2720e537

Branch: refs/heads/feature/METRON-1699-create-batch-profiler
Commit: 2720e537e698fdb278757c1e0fb968d5887c9d90
Parents: f5f765c
Author: nickwallen <n...@nickallen.org>
Authored: Thu Sep 13 08:52:16 2018 -0400
Committer: nickallen <nickal...@apache.org>
Committed: Thu Sep 13 08:52:16 2018 -0400

----------------------------------------------------------------------
 .../profiler/bolt/ProfileBuilderBolt.java       | 20 +++++++++++-------
 .../profiler/bolt/ProfileBuilderBoltTest.java   | 22 ++++++++++++++++++++
 2 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/2720e537/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 6c22b45..e683a84 100644
--- 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -20,6 +20,7 @@
 
 package org.apache.metron.profiler.bolt;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -361,7 +362,6 @@ public class ProfileBuilderBolt extends BaseWindowedBolt 
implements Reloadable {
     }
 
     LOG.debug("Flushed active profiles and found {} measurement(s).", 
measurements.size());
-
   }
 
   /**
@@ -372,15 +372,21 @@ public class ProfileBuilderBolt extends BaseWindowedBolt 
implements Reloadable {
    * that their state is not lost.
    */
   protected void flushExpired() {
+    List<ProfileMeasurement> measurements = null;
+    try {
+      // flush the expired profiles
+      synchronized (messageDistributor) {
+        measurements = messageDistributor.flushExpired();
+        emitMeasurements(measurements);
+      }
 
-    // flush the expired profiles
-    List<ProfileMeasurement> measurements;
-    synchronized (messageDistributor) {
-      measurements = messageDistributor.flushExpired();
-      emitMeasurements(measurements);
+    } catch(Throwable t) {
+      // need to catch the exception, otherwise subsequent executions would be 
suppressed.
+      // see java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate
+      LOG.error("Failed to flush expired profiles", t);
     }
 
-    LOG.debug("Flushed expired profiles and found {} measurement(s).", 
measurements.size());
+    LOG.debug("Flushed expired profiles and found {} measurement(s).", 
CollectionUtils.size(measurements));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/2720e537/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 3d009fb..9108d53 100644
--- 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -50,6 +50,7 @@ import java.util.stream.Collectors;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -247,6 +248,27 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
     verify(outputCollector, times(1)).emit(eq("destination3"), any());
   }
 
+  @Test
+  public void testExceptionWhenFlushingExpiredProfiles() throws Exception {
+    // create an emitter that will throw an exception when emit() called
+    ProfileMeasurementEmitter badEmitter = 
mock(ProfileMeasurementEmitter.class);
+    doThrow(new RuntimeException("flushExpired() should catch this exception"))
+            .when(badEmitter)
+            .emit(any(), any());
+
+    // create a distributor that will return a profile measurement
+    MessageDistributor distributor = mock(MessageDistributor.class);
+    
when(distributor.flushExpired()).thenReturn(Collections.singletonList(measurement));
+
+    // the bolt will use the bad emitter when flushExpired() is called
+    ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
+            .withEmitter(badEmitter)
+            .withMessageDistributor(distributor);
+
+    // the exception thrown by the emitter should not bubble up
+    bolt.flushExpired();
+  }
+
   /**
    * Retrieves the ProfileMeasurement(s) (if any) that have been emitted.
    *

Reply via email to