kgyrtkirk commented on code in PR #18255:
URL: https://github.com/apache/druid/pull/18255#discussion_r2207711845


##########
processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java:
##########
@@ -74,14 +74,20 @@ public List<Event> getEvents()
     return new ArrayList<>(events);
   }
 
+  public int getNumEmittedEvents()
+  {
+    return events.size();
+  }
+
   /**
-   * Gets all the metric events emitted since the previous {@link #flush()}.
+   * Gets all the metric events emitted for the given metric name since the 
previous {@link #flush()}.
    *
-   * @return Map from metric name to list of events emitted for that metric.
+   * @return List of events emitted for the given metric.
    */
-  public Map<String, Queue<ServiceMetricEventSnapshot>> getMetricEvents()
+  public List<ServiceMetricEventSnapshot> getMetricEvents(String metricName)

Review Comment:
   this is exactly what I should have done in the previous PR....but rushed to 
get it running....



##########
server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java:
##########
@@ -987,10 +986,11 @@ public void testMetricsWithMaxSubqueryRowsEnabled()
         ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
     );
 
-    List<Event> events = emitter.getEvents();
+    List<StubServiceEmitter.ServiceMetricEventSnapshot> events =
+        emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC);
 
-    for (Event event : events) {
-      EventMap map = event.toMap();
+    for (StubServiceEmitter.ServiceMetricEventSnapshot event : events) {

Review Comment:
   nit: right now there is `event.getUserDims()` and 
`event.getMetricEvent().getUserDims()`  ; I have no clear understanding why 
this `ServiceMetricEventSnapshot` is necessary ; but if the `userDims` or some 
other things might be clobbered externally; it would be more straight to just 
add a copy constructor to `ServiceMetricEvent` - 
   
   



##########
services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java:
##########
@@ -683,7 +683,8 @@ protected void doService(
     }
     catch (NullPointerException ignored) {
     }
-    Assert.assertEquals("query/time", 
stubServiceEmitter.getEvents().get(0).toMap().get("metric"));
+    // Assert.assertEquals("query/time", 
stubServiceEmitter.getEvents().get(0).toMap().get("metric"));

Review Comment:
   nit: commented old code



##########
server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java:
##########
@@ -1038,23 +1038,26 @@ public void testMetricsWithMaxSubqueryBytesEnabled()
         ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
     );
 
-    List<Event> events = emitter.getEvents();
+    List<StubServiceEmitter.ServiceMetricEventSnapshot> events
+        = emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC);

Review Comment:
   nit: could be inlined into the `for` 



##########
server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java:
##########
@@ -60,7 +60,13 @@ public class LatchableEmitter extends StubServiceEmitter
    */
   private final ScheduledExecutorService conditionEvaluateExecutor;
   private final Set<WaitCondition> waitConditions = new HashSet<>();
-  private final ReentrantReadWriteLock eventReadWriteLock = new 
ReentrantReadWriteLock(true);
+
+  private final ReentrantLock eventProcessingLock = new ReentrantLock();
+
+  /**
+   * Lists of events that have already been processed by {@link 
#evaluateWaitConditions(Event)}.
+   */
+  private final List<Event> processedEvents = new ArrayList<>();

Review Comment:
   no `clear` is called on this field - even during `flush`
   could that cause problems?



##########
server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java:
##########
@@ -158,48 +144,64 @@ public void waitForEventAggregate(
     );
   }
 
-  private void triggerConditionEvaluations()
+  private void triggerConditionEvaluations(Event event)
   {
     if (conditionEvaluateExecutor == null) {
       throw new ISE("Cannot evaluate conditions as the 
'conditionEvaluateExecutor' is null.");
     } else {
-      conditionEvaluateExecutor.submit(this::evaluateWaitConditions);
+      conditionEvaluateExecutor.submit(() -> evaluateWaitConditions(event));
     }
   }
 
   /**
    * Evaluates wait conditions. This method must be invoked on the
    * {@link #conditionEvaluateExecutor} so that it does not block {@link 
#emit(Event)}.
    */
-  private void evaluateWaitConditions()
+  private void evaluateWaitConditions(Event event)
   {
-    eventReadWriteLock.readLock().lock();
+    eventProcessingLock.lock();
     try {
       // Create a copy of the conditions for thread-safety
       final List<WaitCondition> conditionsToEvaluate = 
List.copyOf(waitConditions);
       if (conditionsToEvaluate.isEmpty()) {
         return;
       }
 
-      List<Event> events = getEvents();
       for (WaitCondition condition : conditionsToEvaluate) {
-        final int currentNumberOfEvents = events.size();
-
-        // Do not use an iterator over the list to avoid concurrent 
modification exceptions
-        // Evaluate new events against this condition
-        for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i) 
{
-          if (condition.predicate.test(events.get(i))) {
-            condition.countDownLatch.countDown();
-          }
+        if (condition.predicate.test(event)) {
+          condition.countDownLatch.countDown();
         }
-        condition.processedUntil = currentNumberOfEvents;
       }
     }
     catch (Exception e) {
       log.error(e, "Error while evaluating wait conditions");

Review Comment:
   this is partially unrelated; but I think this should be a serious error - 
(possibly the predicate is broken?)...repack into `RuntimeException` if 
necessary?



##########
server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java:
##########
@@ -107,9 +93,9 @@ public void close()
   public void waitForEvent(Predicate<Event> condition, long timeoutMillis)
   {
     final WaitCondition waitCondition = new WaitCondition(condition);
+    registerWaitCondition(waitCondition);
     waitConditions.add(waitCondition);

Review Comment:
   nit: `register` may already have added it to the `waitConditions`



##########
server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java:
##########
@@ -158,48 +144,64 @@ public void waitForEventAggregate(
     );
   }
 
-  private void triggerConditionEvaluations()
+  private void triggerConditionEvaluations(Event event)
   {
     if (conditionEvaluateExecutor == null) {
       throw new ISE("Cannot evaluate conditions as the 
'conditionEvaluateExecutor' is null.");
     } else {
-      conditionEvaluateExecutor.submit(this::evaluateWaitConditions);
+      conditionEvaluateExecutor.submit(() -> evaluateWaitConditions(event));
     }
   }
 
   /**
    * Evaluates wait conditions. This method must be invoked on the
    * {@link #conditionEvaluateExecutor} so that it does not block {@link 
#emit(Event)}.
    */
-  private void evaluateWaitConditions()
+  private void evaluateWaitConditions(Event event)
   {
-    eventReadWriteLock.readLock().lock();
+    eventProcessingLock.lock();
     try {
       // Create a copy of the conditions for thread-safety
       final List<WaitCondition> conditionsToEvaluate = 
List.copyOf(waitConditions);
       if (conditionsToEvaluate.isEmpty()) {
         return;
       }
 
-      List<Event> events = getEvents();
       for (WaitCondition condition : conditionsToEvaluate) {
-        final int currentNumberOfEvents = events.size();
-
-        // Do not use an iterator over the list to avoid concurrent 
modification exceptions
-        // Evaluate new events against this condition
-        for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i) 
{
-          if (condition.predicate.test(events.get(i))) {
-            condition.countDownLatch.countDown();
-          }
+        if (condition.predicate.test(event)) {
+          condition.countDownLatch.countDown();
         }
-        condition.processedUntil = currentNumberOfEvents;
       }
     }
     catch (Exception e) {
       log.error(e, "Error while evaluating wait conditions");
     }
     finally {
-      eventReadWriteLock.readLock().unlock();
+      processedEvents.add(event);
+      eventProcessingLock.unlock();
+    }
+  }
+
+  /**
+   * Evaluates the given new condition for all past events and then adds it to
+   * {@link #waitConditions}.
+   */
+  private void registerWaitCondition(WaitCondition condition)
+  {
+    eventProcessingLock.lock();
+    try {
+      for (Event event : processedEvents) {
+        if (condition.predicate.test(event)) {
+          condition.countDownLatch.countDown();
+        }
+      }
+      waitConditions.add(condition);

Review Comment:
   nit: we don't necessarily need to add if the condition is already satisfied



##########
server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java:
##########
@@ -47,36 +49,27 @@ public void setUp()
   @Test
   public void testDefaultHeartbeatReported()
   {
-    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(1, emitter.getEvents().size());
-    Assert.assertEquals(HEARTBEAT_METRIC_KEY, 
emitter.getEvents().get(0).toMap().get("metric"));
-    Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
+    emitter.verifyValue(HEARTBEAT_METRIC_KEY, 1);
   }
 
   @Test
   public void testLeaderTag()
   {
     heartbeatTags.put("leader", 1);
-    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(1, emitter.getEvents().size());
-    Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("leader"));
-    Assert.assertEquals(HEARTBEAT_METRIC_KEY, 
emitter.getEvents().get(0).toMap().get("metric"));
-    Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
+    emitter.verifyValue(HEARTBEAT_METRIC_KEY, Map.of("leader", 1), 1);
   }
 
   @Test
   public void testMoreThanOneTag()
   {
     heartbeatTags.put("leader", 1);
     heartbeatTags.put("taskRunner", "http");
-    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(1, emitter.getEvents().size());
-    Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("leader"));
-    Assert.assertEquals("http", 
emitter.getEvents().get(0).toMap().get("taskRunner"));
-    Assert.assertEquals(HEARTBEAT_METRIC_KEY, 
emitter.getEvents().get(0).toMap().get("metric"));
-    Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
+    emitter.verifyValue(HEARTBEAT_METRIC_KEY, Map.of("leader", 1, 
"taskRunner", "http"), 1);

Review Comment:
   pretty cool how concise this become! :)



##########
server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java:
##########
@@ -158,48 +144,64 @@ public void waitForEventAggregate(
     );
   }
 
-  private void triggerConditionEvaluations()
+  private void triggerConditionEvaluations(Event event)
   {
     if (conditionEvaluateExecutor == null) {
       throw new ISE("Cannot evaluate conditions as the 
'conditionEvaluateExecutor' is null.");
     } else {
-      conditionEvaluateExecutor.submit(this::evaluateWaitConditions);
+      conditionEvaluateExecutor.submit(() -> evaluateWaitConditions(event));
     }
   }
 
   /**
    * Evaluates wait conditions. This method must be invoked on the
    * {@link #conditionEvaluateExecutor} so that it does not block {@link 
#emit(Event)}.
    */
-  private void evaluateWaitConditions()
+  private void evaluateWaitConditions(Event event)
   {
-    eventReadWriteLock.readLock().lock();
+    eventProcessingLock.lock();
     try {
       // Create a copy of the conditions for thread-safety
       final List<WaitCondition> conditionsToEvaluate = 
List.copyOf(waitConditions);
       if (conditionsToEvaluate.isEmpty()) {
         return;
       }
 
-      List<Event> events = getEvents();
       for (WaitCondition condition : conditionsToEvaluate) {
-        final int currentNumberOfEvents = events.size();
-
-        // Do not use an iterator over the list to avoid concurrent 
modification exceptions
-        // Evaluate new events against this condition
-        for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i) 
{
-          if (condition.predicate.test(events.get(i))) {
-            condition.countDownLatch.countDown();
-          }
+        if (condition.predicate.test(event)) {
+          condition.countDownLatch.countDown();
         }
-        condition.processedUntil = currentNumberOfEvents;
       }
     }
     catch (Exception e) {
       log.error(e, "Error while evaluating wait conditions");
     }
     finally {
-      eventReadWriteLock.readLock().unlock();
+      processedEvents.add(event);
+      eventProcessingLock.unlock();
+    }
+  }
+
+  /**
+   * Evaluates the given new condition for all past events and then adds it to
+   * {@link #waitConditions}.
+   */
+  private void registerWaitCondition(WaitCondition condition)
+  {
+    eventProcessingLock.lock();
+    try {
+      for (Event event : processedEvents) {
+        if (condition.predicate.test(event)) {
+          condition.countDownLatch.countDown();
+        }

Review Comment:
   nit: shouldn't this be `condition#evaluate` ? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to