This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 609833c97b Do not emit negative lag because of stale offsets (#14292)
609833c97b is described below

commit 609833c97bb44fee50f4fb84e44ad7c6cb053714
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Wed Jul 5 14:44:23 2023 +0530

    Do not emit negative lag because of stale offsets (#14292)
    
    The latest topic offsets are polled frequently and used to determine the 
lag based on the current offsets. However, when the offsets are stale (which 
can happen due to connection issues commonly), we may see a negative lag .
    
    This PR prevents emission of metrics when the offsets are stale and at 
least one of the partitions has a negative lag.
---
 .../supervisor/SeekableStreamSupervisor.java       | 15 +++++++++++++
 .../SeekableStreamSupervisorStateTest.java         | 25 ++++++++++++++++++++++
 2 files changed, 40 insertions(+)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 0ae9aad963..d99f84c574 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -4220,6 +4220,21 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           return;
         }
 
+        // Try emitting lag even with stale metrics provided that none of the 
partitions has negative lag
+        final long staleMillis = sequenceLastUpdated == null
+            ? 0
+            : DateTimes.nowUtc().getMillis()
+              - (tuningConfig.getOffsetFetchPeriod().getMillis() + 
sequenceLastUpdated.getMillis());
+        if (staleMillis > 0 && partitionLags.values().stream().anyMatch(x -> x 
< 0)) {
+          // Log at most once every twenty supervisor runs to reduce noise in 
the logs
+          if ((staleMillis / getIoConfig().getPeriod().getMillis()) % 20 == 0) 
{
+            log.warn("Lag is negative and will not be emitted because topic 
offsets have become stale. "
+                     + "This will not impact data processing. "
+                     + "Offsets may become stale because of connectivity 
issues.");
+          }
+          return;
+        }
+
         LagStats lagStats = computeLags(partitionLags);
         Map<String, Object> metricTags = 
spec.getContextValue(DruidMetrics.TAGS);
         for (Map.Entry<PartitionIdType, Long> entry : 
partitionLags.entrySet()) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index b8d5a556ee..a347541a4e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -62,6 +62,7 @@ import 
org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -1035,6 +1036,30 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     );
   }
 
+  @Test
+  public void testStaleOffsetsNegativeLagNotEmitted() throws Exception
+  {
+    expectEmitterSupervisor(false);
+
+    CountDownLatch latch = new CountDownLatch(1);
+
+    final TestEmittingTestSeekableStreamSupervisor supervisor = new 
TestEmittingTestSeekableStreamSupervisor(
+        latch,
+        TestEmittingTestSeekableStreamSupervisor.LAG,
+        // Record lag must not be emitted
+        ImmutableMap.of("0", 10L, "1", -100L),
+        null
+    );
+    supervisor.start();
+    // Forcibly set the offsets to be stale
+    supervisor.sequenceLastUpdated = 
DateTimes.nowUtc().minus(Integer.MAX_VALUE);
+
+    latch.await();
+
+    supervisor.emitLag();
+    Assert.assertEquals(0, emitter.getEvents().size());
+  }
+
   private List<Event> filterMetrics(List<Event> events, List<String> whitelist)
   {
     List<Event> result = events.stream()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to