Github user mmiklavc commented on a diff in the pull request: https://github.com/apache/metron/pull/1174#discussion_r215785684 --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java --- @@ -127,94 +127,53 @@ @Multiline private static String kryoSerializers; - /** - * The Profiler can generate profiles based on processing time. With processing time, - * the Profiler builds profiles based on when the telemetry is processed. - * - * <p>Not defining a 'timestampField' within the Profiler configuration tells the Profiler - * to use processing time. - * - * <p>There are two mechanisms that will cause a profile to flush. - * - * (1) As new messages arrive, time is advanced. The splitter bolt attaches a timestamp to each - * message (which can be either event or system time.) This advances time and leads to profile - * measurements being flushed. - * - * (2) If no messages arrive to advance time, then the "time-to-live" mechanism will flush a profile - * after a period of time. - * - * <p>This test specifically tests the *first* mechanism where time is advanced by incoming messages. - */ @Test public void testProcessingTime() throws Exception { + uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); - // upload the config to zookeeper - uploadConfig(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); - - // start the topology and write test messages to kafka + // start the topology and write 3 test messages to kafka fluxComponent.submitTopology(); - - // the messages that will be applied to the profile kafkaComponent.writeMessages(inputTopic, message1); kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); // retrieve the profile measurement using PROFILE_GET String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; - List<Integer> actuals = execute(profileGetExpression, List.class); + List<Integer> measurements = execute(profileGetExpression, List.class); - // storm needs at least one message to close its event window + // need to keep checking for measurements until the profiler has flushed one out int attempt = 0; - while(actuals.size() == 0 && attempt++ < 10) { + while(measurements.size() == 0 && attempt++ < 10) { // wait for the profiler to flush long sleep = windowDurationMillis; LOG.debug("Waiting {} millis for profiler to flush", sleep); Thread.sleep(sleep); - // write another message to advance time. this ensures that we are testing the 'normal' flush mechanism. + // write another message to advance time. this ensures we are testing the 'normal' flush mechanism. // if we do not send additional messages to advance time, then it is the profile TTL mechanism which // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); - // retrieve the profile measurement using PROFILE_GET - actuals = execute(profileGetExpression, List.class); + // try again to retrieve the profile measurement using PROFILE_GET + measurements = execute(profileGetExpression, List.class); } - // the profile should count at least 3 messages - assertTrue(actuals.size() > 0); - assertTrue(actuals.get(0) >= 3); + // expect to see only 1 measurement, but could be more (one for each period) depending on --- End diff -- Ok, that makes sense - that's more what I meant when I said "period" in my original inquiry. It's a measurement that is associated with the k-eth period.
---