This is an automated email from the ASF dual-hosted git repository.

nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new fec36c7  METRON-1778 Out-of-order timestamps may delay flush in Storm 
Profiler (nickwallen) closes apache/metron#1197
fec36c7 is described below

commit fec36c7ed0b2d29f4dc48574adf02c7fcb13e5fb
Author: nickwallen <n...@nickallen.org>
AuthorDate: Tue Feb 26 14:34:39 2019 -0500

    METRON-1778 Out-of-order timestamps may delay flush in Storm Profiler 
(nickwallen) closes apache/metron#1197
---
 .../profiler/storm/FixedFrequencyFlushSignal.java  |  82 ++++++-----
 .../storm/FixedFrequencyFlushSignalTest.java       | 151 +++++++++++++++++++--
 2 files changed, 179 insertions(+), 54 deletions(-)

diff --git 
a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignal.java
 
b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignal.java
index 02503c2..db4f99b 100644
--- 
a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignal.java
+++ 
b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignal.java
@@ -32,14 +32,14 @@ public class FixedFrequencyFlushSignal implements 
FlushSignal {
   protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   /**
-   * The latest known timestamp.
+   * Tracks the min timestamp.
    */
-  private long currentTime;
+  private long minTime;
 
   /**
-   * The time when the next flush should occur.
+   * Tracks the max timestamp.
    */
-  private long flushTime;
+  private long maxTime;
 
   /**
    * The amount of time between flushes in milliseconds.
@@ -47,7 +47,6 @@ public class FixedFrequencyFlushSignal implements FlushSignal 
{
   private long flushFrequency;
 
   public FixedFrequencyFlushSignal(long flushFrequencyMillis) {
-
     if(flushFrequencyMillis < 0) {
       throw new IllegalArgumentException("flush frequency must be >= 0");
     }
@@ -61,9 +60,8 @@ public class FixedFrequencyFlushSignal implements FlushSignal 
{
    */
   @Override
   public void reset() {
-    flushTime = 0;
-    currentTime = 0;
-
+    minTime = Long.MAX_VALUE;
+    maxTime = Long.MIN_VALUE;
     LOG.debug("Flush counters reset");
   }
 
@@ -74,31 +72,34 @@ public class FixedFrequencyFlushSignal implements 
FlushSignal {
    */
   @Override
   public void update(long timestamp) {
+    if(LOG.isWarnEnabled()) {
+      checkIfOutOfOrder(timestamp);
+    }
 
-    if(timestamp > currentTime) {
-
-      // need to update current time
-      LOG.debug("Updating current time; last={}, new={}", currentTime, 
timestamp);
-      currentTime = timestamp;
-
-    } else if ((currentTime - timestamp) > flushFrequency) {
-
-      // significantly out-of-order timestamps
-      LOG.warn("Timestamps out-of-order by '{}' ms. This may indicate a 
problem in the data. last={}, current={}",
-              (currentTime - timestamp),
-              timestamp,
-              currentTime);
+    if(timestamp < minTime) {
+      minTime = timestamp;
     }
 
-    if(flushTime == 0) {
+    if(timestamp > maxTime) {
+      maxTime = timestamp;
+    }
+  }
 
-      // set the next time to flush
-      flushTime = currentTime + flushFrequency;
-      LOG.debug("Setting flush time; '{}' ms until flush; flushTime={}, 
currentTime={}, flushFreq={}",
-              timeToNextFlush(),
-              flushTime,
-              currentTime,
-              flushFrequency);
+  /**
+   * Checks if the timestamp is significantly out-of-order.
+   *
+   * @param timestamp The last timestamp.
+   */
+  private void checkIfOutOfOrder(long timestamp) {
+    // do not warn if this is the first timestamp we've seen, which will 
always be 'out-of-order'
+    if (maxTime > Long.MIN_VALUE) {
+
+      long outOfOrderBy = maxTime - timestamp;
+      if (Math.abs(outOfOrderBy) > flushFrequency) {
+        LOG.warn("Timestamp out-of-order by {} ms. This may indicate a problem 
in the data. " +
+                        "timestamp={}, maxKnown={}, flushFreq={} ms",
+                outOfOrderBy, timestamp, maxTime, flushFrequency);
+      }
     }
   }
 
@@ -109,27 +110,20 @@ public class FixedFrequencyFlushSignal implements 
FlushSignal {
    */
   @Override
   public boolean isTimeToFlush() {
+    boolean flush = false;
 
-    boolean flush = currentTime > flushTime;
-    LOG.debug("Flush={}, '{}' ms until flush; currentTime={}, flushTime={}",
-            flush,
-            timeToNextFlush(),
-            currentTime,
-            flushTime);
+    long flushTime = minTime + flushFrequency;
+    if(maxTime >= flushTime) {
+      flush = true;
+    }
 
+    LOG.debug("'{}' ms until flush; flush?={}, minTime={}, maxTime={}, 
flushTime={}",
+            Math.max(0, flushTime - maxTime), flush, minTime, maxTime, 
flushTime);
     return flush;
   }
 
   @Override
   public long currentTimeMillis() {
-    return currentTime;
-  }
-
-  /**
-   * Returns the number of milliseconds to the next flush.
-   * @return The time left until the next flush.
-   */
-  private long timeToNextFlush() {
-    return Math.max(0, flushTime - currentTime);
+    return maxTime;
   }
 }
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java
 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java
index 8b8813b..a753dc4 100644
--- 
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java
+++ 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java
@@ -31,41 +31,172 @@ public class FixedFrequencyFlushSignalTest {
 
   @Test
   public void testSignalFlush() {
+    int flushFreq = 1000;
+    FixedFrequencyFlushSignal signal = new 
FixedFrequencyFlushSignal(flushFreq);
 
-    FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000);
+    // not ready to flush; we have not seen any messages yet
+    assertFalse(signal.isTimeToFlush());
 
-    // not time to flush yet
+    // not ready to flush; flushTime = min + flushFreq = 5000 + 1000 = 6000; 
flush if anything >= flushTime
+    signal.update(5000);
     assertFalse(signal.isTimeToFlush());
 
-    // advance time
+    // ready to flush; flushTime = min + flushFreq = 5000 + 1000 = 6000; 
max(5000,7000) >= 6000
+    signal.update(7000);
+    assertTrue(signal.isTimeToFlush());
+  }
+
+  @Test
+  public void testOutOfOrderTimestamps() {
+    int flushFreq = 5000;
+    FixedFrequencyFlushSignal signal = new 
FixedFrequencyFlushSignal(flushFreq);
+
+    // not ready to flush; flushTime = min + flushFreq = 5000 + 5000 = 10000; 
flush if anything >= flushTime
     signal.update(5000);
+    assertFalse(signal.isTimeToFlush());
 
-    // not time to flush yet
+    // not ready to flush; flushTime = min + flushFreq = 1000 + 5000 = 6000
+    signal.update(1000);
     assertFalse(signal.isTimeToFlush());
 
-    // advance time
+    // ready to flush; flushTime = min + flushFreq = 1000 + 5000 = 6000; 
max(5000,1000,7000) >= 6000
     signal.update(7000);
+    assertTrue(signal.isTimeToFlush());
+
+    // ready to flush, still
+    signal.update(3000);
+    assertTrue(signal.isTimeToFlush());
 
-    // time to flush
+    // ready to flush, still
     assertTrue(signal.isTimeToFlush());
   }
 
   @Test
-  public void testOutOfOrderTimestamps() {
-    FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000);
+  public void testOutOfOrderTimestampsNoFlush() {
+    int flushFreq = 7000;
+    FixedFrequencyFlushSignal signal = new 
FixedFrequencyFlushSignal(flushFreq);
 
-    // advance time, out-of-order
+    // not ready to flush; flushTime = min + flushFreq = 5000 + 7000 = 12000; 
flush if anything >= flushTime
     signal.update(5000);
+    assertFalse(signal.isTimeToFlush());
+
+    // not ready to flush; flushTime = min + flushFreq = 1000 + 7000 = 8000
     signal.update(1000);
+    assertFalse(signal.isTimeToFlush());
+
+    // not ready to flush; flushTime = min + flushFreq = 1000 + 7000 = 8000
     signal.update(7000);
+    assertFalse(signal.isTimeToFlush());
+
+    // ready to flush; flushTime = min + flushFreq = 1000 + 7000 = 8000; 
max(5000,1000,7000,3000) >= 8000
+    signal.update(3000);
+    assertFalse(signal.isTimeToFlush());
+  }
+  
+  @Test
+  public void testTimestampsDescending() {
+    int flushFreq = 3000;
+    FixedFrequencyFlushSignal signal = new 
FixedFrequencyFlushSignal(flushFreq);
+
+    // not ready to flush; flushTime = min + flushFreq = 4100 + 3000 = 7100; 
flush if anything >= flushTime
+    signal.update(4100);
+    assertFalse(signal.isTimeToFlush());
+
+    // not ready to flush; flushTime = min + flushFreq = 3000 + 3000 = 6000
     signal.update(3000);
+    assertFalse(signal.isTimeToFlush());
 
-    // need to flush @ 5000 + 1000 = 6000. if anything > 6000 (even 
out-of-order), then it should signal a flush
+    // not ready to flush; flushTime = min + flushFreq = 2000 + 3000 = 5000
+    signal.update(2000);
+    assertFalse(signal.isTimeToFlush());
+
+    // ready to flush; flushTime = min + flushFreq = 1000 + 3000 = 4000; 
max(4100,3000,2000,1000) >= 4000
+    signal.update(1000);
     assertTrue(signal.isTimeToFlush());
   }
 
+  @Test
+  public void testTimestampsDescendingNoFlush() {
+    int flushFreq = 4000;
+    FixedFrequencyFlushSignal signal = new 
FixedFrequencyFlushSignal(flushFreq);
+
+    // not ready to flush; flushTime = min + flushFreq = 4000 + 4000 = 8000; 
flush if anything >= flushTime
+    signal.update(4000);
+    assertFalse(signal.isTimeToFlush());
+
+    // not ready to flush; flushTime = min + flushFreq = 3000 + 4000 = 7000
+    signal.update(3000);
+    assertFalse(signal.isTimeToFlush());
+
+    // not ready to flush; flushTime = min + flushFreq = 2000 + 4000 = 6000
+    signal.update(2000);
+    assertFalse(signal.isTimeToFlush());
+
+    // not ready to flush; flushTime = min + flushFreq = 1000 + 4000 = 5000
+    signal.update(1000);
+    assertFalse(signal.isTimeToFlush());
+  }
+
+  @Test
+  public void testTimestampsAscending() {
+    int flushFreq = 3000;
+    FixedFrequencyFlushSignal signal = new 
FixedFrequencyFlushSignal(flushFreq);
+
+    // not ready to flush; flushTime = min + flushFreq = 1000 + 3000 = 4000; 
flush if anything >= flushTime
+    signal.update(1000);
+    assertFalse(signal.isTimeToFlush());
+
+    // not ready to flush; flushTime = min + flushFreq = 1000 + 3000 = 4000
+    signal.update(2000);
+    assertFalse(signal.isTimeToFlush());
+
+    // not ready to flush; flushTime = min + flushFreq = 1000 + 3000 = 4000
+    signal.update(3000);
+    assertFalse(signal.isTimeToFlush());
+
+    // ready to flush; flushTime = min + flushFreq = 1000 + 3000 = 4000; 
max(1000,2000,3000,4000) >= 4000
+    signal.update(4000);
+  }
+
+  @Test
+  public void testTimestampsAscendingNoFlush() {
+    int flushFreq = 4000;
+    FixedFrequencyFlushSignal signal = new 
FixedFrequencyFlushSignal(flushFreq);
+
+    // not ready to flush; flushTime = min + flushFreq = 1000 + 4000 = 5000; 
flush if anything >= flushTime
+    signal.update(1000);
+    assertFalse(signal.isTimeToFlush());
+
+    // not ready to flush; flushTime = min + flushFreq = 1000 + 4000 = 5000
+    signal.update(2000);
+    assertFalse(signal.isTimeToFlush());
+
+    // not ready to flush; flushTime = min + flushFreq = 1000 + 4000 = 5000
+    signal.update(3000);
+    assertFalse(signal.isTimeToFlush());
+
+    // not ready to flush; flushTime = min + flushFreq = 1000 + 4000 = 5000
+    signal.update(4000);
+    assertFalse(signal.isTimeToFlush());
+  }
+
   @Test(expected = IllegalArgumentException.class)
   public void testNegativeFrequency() {
+    // a negative flush frequency makes no sense
     new FixedFrequencyFlushSignal(-1000);
   }
+
+  @Test
+  public void testReset() {
+    FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(4000);
+    signal.update(1000);
+    signal.update(6000);
+
+    // ready to flush; flushTime = min + flushFreq = 1000 + 4000 = 5000; 
max(1000,6000) >= 5000
+    assertTrue(signal.isTimeToFlush());
+
+    // reset should turn off the flush signal
+    signal.reset();
+    assertFalse(signal.isTimeToFlush());
+  }
 }

Reply via email to