This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch 34.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/34.0.0 by this push:
new a715ffdb8ac Fix concurrency issues with StubServiceEmitter (#18249)
(#18260)
a715ffdb8ac is described below
commit a715ffdb8acdb693584ee0addf318843c96e539b
Author: Lucas Capistrant <[email protected]>
AuthorDate: Tue Jul 15 13:55:50 2025 -0500
Fix concurrency issues with StubServiceEmitter (#18249) (#18260)
* concurrency issue with StubServiceEmitter - #18121 have added a few new
metrics which have increased the load on it and have caused random appearances
of an issue arising from the fact it used an `ArrayList` under the hood.
* added some catches to shut down queries properly in case some unexpected
exceptions occur - this could give better exceptions and reduce time to fix in
the future
this should reduce the probability that `QTest` splits remain hanging
Co-authored-by: Zoltan Haindrich <[email protected]>
---
.../msq/dart/controller/sql/DartQueryMaker.java | 9 +++++++++
.../org/apache/druid/msq/exec/ControllerImpl.java | 13 +++++++++++++
.../overlord/duty/UnusedSegmentsKillerTest.java | 3 ++-
pom.xml | 2 +-
.../util/metrics/HttpPostEmitterMonitorTest.java | 8 ++++----
.../druid/java/util/metrics/StubServiceEmitter.java | 21 ++++++++++++---------
.../appenderator/StreamAppenderatorTest.java | 5 +++--
.../druid/server/audit/SQLAuditManagerTest.java | 13 +++++++------
.../druid/server/metrics/LatchableEmitter.java | 5 +++--
9 files changed, 54 insertions(+), 25 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
index a6583db66fc..60563d6daf4 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
@@ -405,6 +405,15 @@ public class DartQueryMaker implements QueryMaker
controller.queryId()
);
}
+ catch (Throwable e) {
+ log.error(
+ e,
+ "Controller failed for sqlQueryId[%s], controllerHost[%s]",
+ plannerContext.getSqlQueryId(),
+ controller.queryId()
+ );
+ throw e;
+ }
finally {
controllerRegistry.deregister(controllerHolder);
Thread.currentThread().setName(threadName);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 9c3d94094cb..a68a58ae476 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -343,10 +343,23 @@ public class ControllerImpl implements Controller
try (final Closer closer = Closer.create()) {
reportPayload = runInternal(queryListener, closer);
}
+ catch (Throwable e) {
+ log.error(e, "Controller internal execution encountered exception.");
+ queryListener.onQueryComplete(makeStatusReportForException(e));
+ throw e;
+ }
// Call onQueryComplete after Closer is fully closed, ensuring no
controller-related processing is ongoing.
queryListener.onQueryComplete(reportPayload);
}
+
+ private MSQTaskReportPayload makeStatusReportForException(Throwable e)
+ {
+ MSQErrorReport errorReport = MSQErrorReport.fromFault(queryId(), null,
null, UnknownFault.forException(e));
+ MSQStatusReport statusReport = new MSQStatusReport(TaskState.FAILED,
errorReport, null, null, 0, new HashMap<>(), 0, 0, null, null);
+ return new MSQTaskReportPayload(statusReport, null, null, null);
+ }
+
@Override
public void stop(CancellationReason reason)
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
index 1130eefa8d7..111489a74dc 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
@@ -52,6 +52,7 @@ import org.junit.Rule;
import org.junit.Test;
import java.util.List;
+import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
@@ -328,7 +329,7 @@ public class UnusedSegmentsKillerTest
emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
// Verify that the kill intervals are sorted with the oldest interval first
- final List<StubServiceEmitter.ServiceMetricEventSnapshot> events =
+ final Queue<StubServiceEmitter.ServiceMetricEventSnapshot> events =
emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION);
final List<Interval> killIntervals = events.stream().map(event -> {
final String taskId = (String)
event.getUserDims().get(DruidMetrics.TASK_ID);
diff --git a/pom.xml b/pom.xml
index cf282bc3cb2..0b432865810 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1094,7 +1094,7 @@
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
- <version>5.10.2</version>
+ <version>5.13.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
index 601dc4a61b4..2be52c091e7 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java
@@ -25,8 +25,8 @@ import
org.apache.druid.java.util.emitter.core.HttpPostEmitter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.List;
import java.util.Map;
+import java.util.Queue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -65,7 +65,7 @@ public class HttpPostEmitterMonitorTest
assertTrue(monitor.doMonitor(stubServiceEmitter));
- final Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>>
metricEvents = stubServiceEmitter.getMetricEvents();
+ final Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>>
metricEvents = stubServiceEmitter.getMetricEvents();
assertMetricValue(metricEvents, "emitter/successfulSending/maxTimeMs", 0);
assertMetricValue(metricEvents, "emitter/events/emitted/delta", 100L);
@@ -83,8 +83,8 @@ public class HttpPostEmitterMonitorTest
assertMetricValue(metricEvents, "emitter/failedSending/maxTimeMs", 0L);
}
- private void assertMetricValue(Map<String,
List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String
metricName, Number expectedValue)
+ private void assertMetricValue(Map<String,
Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String
metricName, Number expectedValue)
{
-
assertEquals(metricEvents.get(metricName).get(0).getMetricEvent().getValue().doubleValue(),
expectedValue.doubleValue());
+
assertEquals(metricEvents.get(metricName).peek().getMetricEvent().getValue().doubleValue(),
expectedValue.doubleValue());
}
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index 323d8cd308c..55113b97ac2 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -24,11 +24,14 @@ import
org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
/**
* Test implementation of {@link ServiceEmitter} that collects emitted metrics
@@ -36,9 +39,9 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class StubServiceEmitter extends ServiceEmitter implements
MetricsVerifier
{
- private final List<Event> events = new ArrayList<>();
- private final List<AlertEvent> alertEvents = new ArrayList<>();
- private final ConcurrentHashMap<String, List<ServiceMetricEventSnapshot>>
metricEvents = new ConcurrentHashMap<>();
+ private final Queue<Event> events = new ConcurrentLinkedDeque<>();
+ private final Queue<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>();
+ private final ConcurrentHashMap<String, Queue<ServiceMetricEventSnapshot>>
metricEvents = new ConcurrentHashMap<>();
public StubServiceEmitter()
{
@@ -55,7 +58,7 @@ public class StubServiceEmitter extends ServiceEmitter
implements MetricsVerifie
{
if (event instanceof ServiceMetricEvent) {
ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
- metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new
ArrayList<>())
+ metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new
ConcurrentLinkedDeque<>())
.add(new ServiceMetricEventSnapshot(metricEvent));
} else if (event instanceof AlertEvent) {
alertEvents.add((AlertEvent) event);
@@ -68,7 +71,7 @@ public class StubServiceEmitter extends ServiceEmitter
implements MetricsVerifie
*/
public List<Event> getEvents()
{
- return events;
+ return new ArrayList<>(events);
}
/**
@@ -76,7 +79,7 @@ public class StubServiceEmitter extends ServiceEmitter
implements MetricsVerifie
*
* @return Map from metric name to list of events emitted for that metric.
*/
- public Map<String, List<ServiceMetricEventSnapshot>> getMetricEvents()
+ public Map<String, Queue<ServiceMetricEventSnapshot>> getMetricEvents()
{
return metricEvents;
}
@@ -86,7 +89,7 @@ public class StubServiceEmitter extends ServiceEmitter
implements MetricsVerifie
*/
public List<AlertEvent> getAlerts()
{
- return alertEvents;
+ return new ArrayList<>(alertEvents);
}
@Override
@@ -96,8 +99,8 @@ public class StubServiceEmitter extends ServiceEmitter
implements MetricsVerifie
)
{
final List<Number> values = new ArrayList<>();
- final List<ServiceMetricEventSnapshot> events =
- metricEvents.getOrDefault(metricName, Collections.emptyList());
+ final Queue<ServiceMetricEventSnapshot> events =
+ metricEvents.getOrDefault(metricName, new ArrayDeque<>());
final Map<String, Object> filters =
dimensionFilters == null ? Collections.emptyMap() : dimensionFilters;
for (ServiceMetricEventSnapshot event : events) {
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index fee0d2ca757..463c4ed8c53 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -78,6 +78,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -162,7 +163,7 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
).get();
Assert.assertEquals(
ImmutableMap.of("x", "3"),
- (Map<String, String>) segmentsAndCommitMetadata.getCommitMetadata()
+ segmentsAndCommitMetadata.getCommitMetadata()
);
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
@@ -2278,7 +2279,7 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
private void verifySinkMetrics(StubServiceEmitter emitter, Set<String>
segmentIds)
{
- Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> events =
emitter.getMetricEvents();
+ Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> events =
emitter.getMetricEvents();
int segments = segmentIds.size();
Assert.assertEquals(4, events.size());
Assert.assertTrue(events.containsKey("query/cpu/time"));
diff --git
a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
index f10248bf1e9..3505eb943a2 100644
---
a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
+++
b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
@@ -43,6 +43,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.TreeMap;
@RunWith(MockitoJUnitRunner.class)
@@ -91,14 +92,14 @@ public class SQLAuditManagerTest
final AuditEntry entry = createAuditEntry("testKey", "testType",
DateTimes.nowUtc());
auditManager.doAudit(entry);
- Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>>
metricEvents = serviceEmitter.getMetricEvents();
+ Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>>
metricEvents = serviceEmitter.getMetricEvents();
Assert.assertEquals(1, metricEvents.size());
- List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents =
metricEvents.get("config/audit");
+ Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents =
metricEvents.get("config/audit");
Assert.assertNotNull(auditMetricEvents);
Assert.assertEquals(1, auditMetricEvents.size());
- ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent();
+ ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();
final AuditEntry dbEntry = lookupAuditEntryForKey("testKey");
Assert.assertNotNull(dbEntry);
@@ -120,14 +121,14 @@ public class SQLAuditManagerTest
Assert.assertEquals(entry, dbEntry);
// Verify emitted metrics
- Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>>
metricEvents = serviceEmitter.getMetricEvents();
+ Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>>
metricEvents = serviceEmitter.getMetricEvents();
Assert.assertEquals(1, metricEvents.size());
- List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents =
metricEvents.get("config/audit");
+ Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents =
metricEvents.get("config/audit");
Assert.assertNotNull(auditMetricEvents);
Assert.assertEquals(1, auditMetricEvents.size());
- ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent();
+ ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();
Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key"));
Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type"));
Assert.assertNull(metric.getUserDims().get("payload"));
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
index 87a5b612147..aeed3c40ea9 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
@@ -181,13 +181,14 @@ public class LatchableEmitter extends StubServiceEmitter
return;
}
+ List<Event> events = getEvents();
for (WaitCondition condition : conditionsToEvaluate) {
- final int currentNumberOfEvents = getEvents().size();
+ final int currentNumberOfEvents = events.size();
// Do not use an iterator over the list to avoid concurrent
modification exceptions
// Evaluate new events against this condition
for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i)
{
- if (condition.predicate.test(getEvents().get(i))) {
+ if (condition.predicate.test(events.get(i))) {
condition.countDownLatch.countDown();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]