kgyrtkirk commented on code in PR #18255:
URL: https://github.com/apache/druid/pull/18255#discussion_r2207711845
##########
processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java:
##########
@@ -74,14 +74,20 @@ public List<Event> getEvents()
return new ArrayList<>(events);
}
+ public int getNumEmittedEvents()
+ {
+ return events.size();
+ }
+
/**
- * Gets all the metric events emitted since the previous {@link #flush()}.
+ * Gets all the metric events emitted for the given metric name since the
previous {@link #flush()}.
*
- * @return Map from metric name to list of events emitted for that metric.
+ * @return List of events emitted for the given metric.
*/
- public Map<String, Queue<ServiceMetricEventSnapshot>> getMetricEvents()
+ public List<ServiceMetricEventSnapshot> getMetricEvents(String metricName)
Review Comment:
this is exactly what I should have done in the previous PR....but rushed to
get it running....
##########
server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java:
##########
@@ -987,10 +986,11 @@ public void testMetricsWithMaxSubqueryRowsEnabled()
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
);
- List<Event> events = emitter.getEvents();
+ List<StubServiceEmitter.ServiceMetricEventSnapshot> events =
+ emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC);
- for (Event event : events) {
- EventMap map = event.toMap();
+ for (StubServiceEmitter.ServiceMetricEventSnapshot event : events) {
Review Comment:
nit: right now there is `event.getUserDims()` and
`event.getMetricEvent().getUserDims()` ; I have no clear understanding why
this `ServiceMetricEventSnapshot` is necessary ; but if the `userDims` or some
other things might be clobbered externally; it would be more straight to just
add a copy constructor to `ServiceMetricEvent` -
##########
services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java:
##########
@@ -683,7 +683,8 @@ protected void doService(
}
catch (NullPointerException ignored) {
}
- Assert.assertEquals("query/time",
stubServiceEmitter.getEvents().get(0).toMap().get("metric"));
+ // Assert.assertEquals("query/time",
stubServiceEmitter.getEvents().get(0).toMap().get("metric"));
Review Comment:
nit: commented old code
##########
server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java:
##########
@@ -1038,23 +1038,26 @@ public void testMetricsWithMaxSubqueryBytesEnabled()
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
);
- List<Event> events = emitter.getEvents();
+ List<StubServiceEmitter.ServiceMetricEventSnapshot> events
+ = emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC);
Review Comment:
nit: could be inlined into the `for`
##########
server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java:
##########
@@ -60,7 +60,13 @@ public class LatchableEmitter extends StubServiceEmitter
*/
private final ScheduledExecutorService conditionEvaluateExecutor;
private final Set<WaitCondition> waitConditions = new HashSet<>();
- private final ReentrantReadWriteLock eventReadWriteLock = new
ReentrantReadWriteLock(true);
+
+ private final ReentrantLock eventProcessingLock = new ReentrantLock();
+
+ /**
+ * Lists of events that have already been processed by {@link
#evaluateWaitConditions(Event)}.
+ */
+ private final List<Event> processedEvents = new ArrayList<>();
Review Comment:
no `clear` is called on this field - even during `flush`
could that cause problems?
##########
server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java:
##########
@@ -158,48 +144,64 @@ public void waitForEventAggregate(
);
}
- private void triggerConditionEvaluations()
+ private void triggerConditionEvaluations(Event event)
{
if (conditionEvaluateExecutor == null) {
throw new ISE("Cannot evaluate conditions as the
'conditionEvaluateExecutor' is null.");
} else {
- conditionEvaluateExecutor.submit(this::evaluateWaitConditions);
+ conditionEvaluateExecutor.submit(() -> evaluateWaitConditions(event));
}
}
/**
* Evaluates wait conditions. This method must be invoked on the
* {@link #conditionEvaluateExecutor} so that it does not block {@link
#emit(Event)}.
*/
- private void evaluateWaitConditions()
+ private void evaluateWaitConditions(Event event)
{
- eventReadWriteLock.readLock().lock();
+ eventProcessingLock.lock();
try {
// Create a copy of the conditions for thread-safety
final List<WaitCondition> conditionsToEvaluate =
List.copyOf(waitConditions);
if (conditionsToEvaluate.isEmpty()) {
return;
}
- List<Event> events = getEvents();
for (WaitCondition condition : conditionsToEvaluate) {
- final int currentNumberOfEvents = events.size();
-
- // Do not use an iterator over the list to avoid concurrent
modification exceptions
- // Evaluate new events against this condition
- for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i)
{
- if (condition.predicate.test(events.get(i))) {
- condition.countDownLatch.countDown();
- }
+ if (condition.predicate.test(event)) {
+ condition.countDownLatch.countDown();
}
- condition.processedUntil = currentNumberOfEvents;
}
}
catch (Exception e) {
log.error(e, "Error while evaluating wait conditions");
Review Comment:
this is partially unrelated; but I think this should be a serious error -
(possibly the predicate is broken?)...repack into `RuntimeException` if
necessary?
##########
server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java:
##########
@@ -107,9 +93,9 @@ public void close()
public void waitForEvent(Predicate<Event> condition, long timeoutMillis)
{
final WaitCondition waitCondition = new WaitCondition(condition);
+ registerWaitCondition(waitCondition);
waitConditions.add(waitCondition);
Review Comment:
nit: `register` may already have added it to the `waitConditions`
##########
server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java:
##########
@@ -158,48 +144,64 @@ public void waitForEventAggregate(
);
}
- private void triggerConditionEvaluations()
+ private void triggerConditionEvaluations(Event event)
{
if (conditionEvaluateExecutor == null) {
throw new ISE("Cannot evaluate conditions as the
'conditionEvaluateExecutor' is null.");
} else {
- conditionEvaluateExecutor.submit(this::evaluateWaitConditions);
+ conditionEvaluateExecutor.submit(() -> evaluateWaitConditions(event));
}
}
/**
* Evaluates wait conditions. This method must be invoked on the
* {@link #conditionEvaluateExecutor} so that it does not block {@link
#emit(Event)}.
*/
- private void evaluateWaitConditions()
+ private void evaluateWaitConditions(Event event)
{
- eventReadWriteLock.readLock().lock();
+ eventProcessingLock.lock();
try {
// Create a copy of the conditions for thread-safety
final List<WaitCondition> conditionsToEvaluate =
List.copyOf(waitConditions);
if (conditionsToEvaluate.isEmpty()) {
return;
}
- List<Event> events = getEvents();
for (WaitCondition condition : conditionsToEvaluate) {
- final int currentNumberOfEvents = events.size();
-
- // Do not use an iterator over the list to avoid concurrent
modification exceptions
- // Evaluate new events against this condition
- for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i)
{
- if (condition.predicate.test(events.get(i))) {
- condition.countDownLatch.countDown();
- }
+ if (condition.predicate.test(event)) {
+ condition.countDownLatch.countDown();
}
- condition.processedUntil = currentNumberOfEvents;
}
}
catch (Exception e) {
log.error(e, "Error while evaluating wait conditions");
}
finally {
- eventReadWriteLock.readLock().unlock();
+ processedEvents.add(event);
+ eventProcessingLock.unlock();
+ }
+ }
+
+ /**
+ * Evaluates the given new condition for all past events and then adds it to
+ * {@link #waitConditions}.
+ */
+ private void registerWaitCondition(WaitCondition condition)
+ {
+ eventProcessingLock.lock();
+ try {
+ for (Event event : processedEvents) {
+ if (condition.predicate.test(event)) {
+ condition.countDownLatch.countDown();
+ }
+ }
+ waitConditions.add(condition);
Review Comment:
nit: we don't necessarily need to add if the condition is already satisfied
##########
server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java:
##########
@@ -47,36 +49,27 @@ public void setUp()
@Test
public void testDefaultHeartbeatReported()
{
- final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(1, emitter.getEvents().size());
- Assert.assertEquals(HEARTBEAT_METRIC_KEY,
emitter.getEvents().get(0).toMap().get("metric"));
- Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
+ emitter.verifyValue(HEARTBEAT_METRIC_KEY, 1);
}
@Test
public void testLeaderTag()
{
heartbeatTags.put("leader", 1);
- final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(1, emitter.getEvents().size());
- Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("leader"));
- Assert.assertEquals(HEARTBEAT_METRIC_KEY,
emitter.getEvents().get(0).toMap().get("metric"));
- Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
+ emitter.verifyValue(HEARTBEAT_METRIC_KEY, Map.of("leader", 1), 1);
}
@Test
public void testMoreThanOneTag()
{
heartbeatTags.put("leader", 1);
heartbeatTags.put("taskRunner", "http");
- final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(1, emitter.getEvents().size());
- Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("leader"));
- Assert.assertEquals("http",
emitter.getEvents().get(0).toMap().get("taskRunner"));
- Assert.assertEquals(HEARTBEAT_METRIC_KEY,
emitter.getEvents().get(0).toMap().get("metric"));
- Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value"));
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
+ emitter.verifyValue(HEARTBEAT_METRIC_KEY, Map.of("leader", 1,
"taskRunner", "http"), 1);
Review Comment:
pretty cool how concise this become! :)
##########
server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java:
##########
@@ -158,48 +144,64 @@ public void waitForEventAggregate(
);
}
- private void triggerConditionEvaluations()
+ private void triggerConditionEvaluations(Event event)
{
if (conditionEvaluateExecutor == null) {
throw new ISE("Cannot evaluate conditions as the
'conditionEvaluateExecutor' is null.");
} else {
- conditionEvaluateExecutor.submit(this::evaluateWaitConditions);
+ conditionEvaluateExecutor.submit(() -> evaluateWaitConditions(event));
}
}
/**
* Evaluates wait conditions. This method must be invoked on the
* {@link #conditionEvaluateExecutor} so that it does not block {@link
#emit(Event)}.
*/
- private void evaluateWaitConditions()
+ private void evaluateWaitConditions(Event event)
{
- eventReadWriteLock.readLock().lock();
+ eventProcessingLock.lock();
try {
// Create a copy of the conditions for thread-safety
final List<WaitCondition> conditionsToEvaluate =
List.copyOf(waitConditions);
if (conditionsToEvaluate.isEmpty()) {
return;
}
- List<Event> events = getEvents();
for (WaitCondition condition : conditionsToEvaluate) {
- final int currentNumberOfEvents = events.size();
-
- // Do not use an iterator over the list to avoid concurrent
modification exceptions
- // Evaluate new events against this condition
- for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i)
{
- if (condition.predicate.test(events.get(i))) {
- condition.countDownLatch.countDown();
- }
+ if (condition.predicate.test(event)) {
+ condition.countDownLatch.countDown();
}
- condition.processedUntil = currentNumberOfEvents;
}
}
catch (Exception e) {
log.error(e, "Error while evaluating wait conditions");
}
finally {
- eventReadWriteLock.readLock().unlock();
+ processedEvents.add(event);
+ eventProcessingLock.unlock();
+ }
+ }
+
+ /**
+ * Evaluates the given new condition for all past events and then adds it to
+ * {@link #waitConditions}.
+ */
+ private void registerWaitCondition(WaitCondition condition)
+ {
+ eventProcessingLock.lock();
+ try {
+ for (Event event : processedEvents) {
+ if (condition.predicate.test(event)) {
+ condition.countDownLatch.countDown();
+ }
Review Comment:
nit: shouldn't this be `condition#evaluate` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]