METRON-1775 Transient exception could prevent expired profiles from being flushed (nickwallen) closes apache/metron#1194
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/2720e537 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/2720e537 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/2720e537 Branch: refs/heads/feature/METRON-1699-create-batch-profiler Commit: 2720e537e698fdb278757c1e0fb968d5887c9d90 Parents: f5f765c Author: nickwallen <n...@nickallen.org> Authored: Thu Sep 13 08:52:16 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Thu Sep 13 08:52:16 2018 -0400 ---------------------------------------------------------------------- .../profiler/bolt/ProfileBuilderBolt.java | 20 +++++++++++------- .../profiler/bolt/ProfileBuilderBoltTest.java | 22 ++++++++++++++++++++ 2 files changed, 35 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/2720e537/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java index 6c22b45..e683a84 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java @@ -20,6 +20,7 @@ package org.apache.metron.profiler.bolt; +import org.apache.commons.collections4.CollectionUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -361,7 +362,6 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { } LOG.debug("Flushed active profiles and found {} measurement(s).", measurements.size()); - } /** @@ -372,15 +372,21 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { * that their state is not lost. */ protected void flushExpired() { + List<ProfileMeasurement> measurements = null; + try { + // flush the expired profiles + synchronized (messageDistributor) { + measurements = messageDistributor.flushExpired(); + emitMeasurements(measurements); + } - // flush the expired profiles - List<ProfileMeasurement> measurements; - synchronized (messageDistributor) { - measurements = messageDistributor.flushExpired(); - emitMeasurements(measurements); + } catch(Throwable t) { + // need to catch the exception, otherwise subsequent executions would be suppressed. + // see java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate + LOG.error("Failed to flush expired profiles", t); } - LOG.debug("Flushed expired profiles and found {} measurement(s).", measurements.size()); + LOG.debug("Flushed expired profiles and found {} measurement(s).", CollectionUtils.size(measurements)); } /** http://git-wip-us.apache.org/repos/asf/metron/blob/2720e537/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java index 3d009fb..9108d53 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java @@ -50,6 +50,7 @@ import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -247,6 +248,27 @@ public class ProfileBuilderBoltTest extends BaseBoltTest { verify(outputCollector, times(1)).emit(eq("destination3"), any()); } + @Test + public void testExceptionWhenFlushingExpiredProfiles() throws Exception { + // create an emitter that will throw an exception when emit() called + ProfileMeasurementEmitter badEmitter = mock(ProfileMeasurementEmitter.class); + doThrow(new RuntimeException("flushExpired() should catch this exception")) + .when(badEmitter) + .emit(any(), any()); + + // create a distributor that will return a profile measurement + MessageDistributor distributor = mock(MessageDistributor.class); + when(distributor.flushExpired()).thenReturn(Collections.singletonList(measurement)); + + // the bolt will use the bad emitter when flushExpired() is called + ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt() + .withEmitter(badEmitter) + .withMessageDistributor(distributor); + + // the exception thrown by the emitter should not bubble up + bolt.flushExpired(); + } + /** * Retrieves the ProfileMeasurement(s) (if any) that have been emitted. *