Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215621040 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -150,27 +159,98 @@ public void testProcessingTime() throws Exception { kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); + // retrieve the profile measurement using PROFILE_GET + String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; + List<Integer> actuals = execute(profileGetExpression, List.class); + // storm needs at least one message to close its event window int attempt = 0; - while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { + while(actuals.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // retrieve the profile measurement using PROFILE_GET + actuals = execute(profileGetExpression, List.class); } - // validate what was flushed - List<Integer> actuals = read( - profilerTable.getPutLog(), - columnFamily, - columnBuilder.getColumnQualifier("value"), - Integer.class); - assertEquals(1, actuals.size()); + // the profile should count at least 3 messages + assertTrue(actuals.size() > 0); assertTrue(actuals.get(0) >= 3); } + /** + * The Profiler can generate profiles based on processing time. With processing time, + * the Profiler builds profiles based on when the telemetry is processed. + * + * <p>Not defining a 'timestampField' within the Profiler configuration tells the Profiler + * to use processing time. + * + * <p>There are two mechanisms that will cause a profile to flush. + * + * (1) As new messages arrive, time is advanced. The splitter bolt attaches a timestamp to each + * message (which can be either event or system time.) This advances time and leads to profile + * measurements being flushed. + * + * (2) If no messages arrive to advance time, then the "time to live" mechanism will flush a profile + * after a period of time. + * + * <p>This test specifically tests the *second* mechanism when a profile is flushed by the + * "time to live" mechanism. + */ + @Test + public void testProcessingTimeWithTimeToLiveFlush() throws Exception { --- End diff -- Ok, fair point. Let me see if I can't move this to the README and clean it up.
---