METRON-1518 Build Failure When Using Profile HDP-2.5.0.0 (nickwallen) closes apache/metron#986
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/ed50d48b Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/ed50d48b Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/ed50d48b Branch: refs/heads/feature/METRON-1090-stellar-assignment Commit: ed50d48bb47cf3f301884f6e18fe4efc8c1b91f1 Parents: a8b555d Author: nickwallen <n...@nickallen.org> Authored: Tue Apr 10 17:16:20 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Tue Apr 10 17:16:20 2018 -0400 ---------------------------------------------------------------------- .../profiler/bolt/ProfileBuilderBolt.java | 51 +++++--------------- 1 file changed, 11 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/ed50d48b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java index fb3d2d0..ca02b58 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java @@ -42,13 +42,11 @@ import org.apache.metron.stellar.common.utils.ConversionUtils; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.zookeeper.SimpleEventListener; import org.apache.metron.zookeeper.ZKCache; -import org.apache.storm.StormTimer; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseWindowedBolt; import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.Utils; import org.apache.storm.windowing.TupleWindow; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -59,9 +57,9 @@ import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; import static java.lang.String.format; import static org.apache.metron.profiler.bolt.ProfileSplitterBolt.ENTITY_TUPLE_FIELD; @@ -155,8 +153,8 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { private FlushSignal activeFlushSignal; /** - * A timer that flushes expired profiles on a regular interval. The expired profiles - * are flushed on a separate thread. + * An executor that flushes expired profiles at a regular interval on a separate + * thread. * * <p>Flushing expired profiles ensures that any profiles that stop receiving messages * for an extended period of time will continue to be flushed. @@ -164,7 +162,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { * <p>This introduces concurrency issues as the bolt is no longer single threaded. Due * to this, all access to the {@code MessageDistributor} needs to be protected. */ - private StormTimer expiredFlushTimer; + private transient ScheduledExecutorService flushExpiredExecutor; public ProfileBuilderBolt() { this.emitters = new ArrayList<>(); @@ -202,7 +200,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { this.configurations = new ProfilerConfigurations(); this.activeFlushSignal = new FixedFrequencyFlushSignal(periodDurationMillis); setupZookeeper(); - startExpiredFlushTimer(); + startFlushingExpiredProfiles(); } @Override @@ -210,7 +208,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { try { zookeeperCache.close(); zookeeperClient.close(); - expiredFlushTimer.close(); + flushExpiredExecutor.shutdown(); } catch(Throwable e) { LOG.error("Exception when cleaning up", e); @@ -421,39 +419,12 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { } /** - * Converts milliseconds to seconds and handles an ugly cast. - * - * @param millis Duration in milliseconds. - * @return Duration in seconds. - */ - private int toSeconds(long millis) { - return (int) TimeUnit.MILLISECONDS.toSeconds(millis); - } - - /** - * Creates a timer that regularly flushes expired profiles on a separate thread. - */ - private void startExpiredFlushTimer() { - - expiredFlushTimer = createTimer("flush-expired-profiles-timer"); - expiredFlushTimer.scheduleRecurring(0, toSeconds(profileTimeToLiveMillis), () -> flushExpired()); - } - - /** - * Creates a timer that can execute a task on a fixed interval. - * - * <p>If the timer encounters an exception, the entire process will be killed. - * - * @param name The name of the timer. - * @return The timer. + * Creates a separate thread that regularly flushes expired profiles. */ - private StormTimer createTimer(String name) { + private void startFlushingExpiredProfiles() { - return new StormTimer(name, (thread, exception) -> { - String msg = String.format("Unexpected exception in timer task; timer=%s", name); - LOG.error(msg, exception); - Utils.exitProcess(1, msg); - }); + flushExpiredExecutor = Executors.newSingleThreadScheduledExecutor(); + flushExpiredExecutor.scheduleAtFixedRate(() -> flushExpired(), 0, profileTimeToLiveMillis, TimeUnit.MILLISECONDS); } @Override