This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5a894f8 Added backpressure metric (#6335)
5a894f8 is described below
commit 5a894f830b6d0dd7630bdab87a9fe0fd0237c704
Author: Shiv Toolsidass <[email protected]>
AuthorDate: Sat Sep 29 14:24:04 2018 -0700
Added backpressure metric (#6335)
* Added backpressure metric
* Updated channelReadable to AtomicBoolean and fixed broken test
* Moved backpressure metric logic to NettyHttpClient
* Fix placement of calculating backPressureDuration
---
.../apache/druid/java/util/http/client/NettyHttpClient.java | 6 ++++++
.../java/util/http/client/response/HttpResponseHandler.java | 3 ++-
.../main/java/org/apache/druid/query/DefaultQueryMetrics.java | 6 ++++++
.../src/main/java/org/apache/druid/query/QueryMetrics.java | 5 +++++
.../apache/druid/query/search/DefaultSearchQueryMetrics.java | 6 ++++++
.../apache/druid/query/select/DefaultSelectQueryMetrics.java | 6 ++++++
.../java/org/apache/druid/query/DefaultQueryMetricsTest.java | 5 +++++
.../main/java/org/apache/druid/client/DirectDruidClient.java | 11 +++++++++--
8 files changed, 45 insertions(+), 3 deletions(-)
diff --git
a/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
b/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
index d15e1e6..4603a33 100644
---
a/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
+++
b/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
@@ -72,6 +72,7 @@ public class NettyHttpClient extends AbstractHttpClient
private final ResourcePool<String, ChannelFuture> pool;
private final HttpClientConfig.CompressionCodec compressionCodec;
private final Duration defaultReadTimeout;
+ private long backPressureStartTimeNs;
NettyHttpClient(
ResourcePool<String, ChannelFuture> pool,
@@ -212,9 +213,13 @@ public class NettyHttpClient extends AbstractHttpClient
if (suspendWatermark >= 0 && resumeWatermark >=
suspendWatermark) {
suspendWatermark = -1;
channel.setReadable(true);
+ long backPressureDuration = System.nanoTime() -
backPressureStartTimeNs;
log.debug("[%s] Resumed reads from channel (chunkNum =
%,d).", requestDesc, resumeChunkNum);
+ return backPressureDuration;
}
}
+
+ return 0; //If we didn't resume, don't know if backpressure
was happening
};
response = handler.handleResponse(httpResponse, trafficCop);
if (response.isFinished()) {
@@ -271,6 +276,7 @@ public class NettyHttpClient extends AbstractHttpClient
suspendWatermark = Math.max(suspendWatermark, currentChunkNum);
if (suspendWatermark > resumeWatermark) {
channel.setReadable(false);
+ backPressureStartTimeNs = System.nanoTime();
log.debug("[%s] Suspended reads from channel (chunkNum =
%,d).", requestDesc, currentChunkNum);
}
}
diff --git
a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java
b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java
index 02d6caa..03e54e7 100644
---
a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java
+++
b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java
@@ -91,7 +91,8 @@ public interface HttpResponseHandler<IntermediateType,
FinalType>
* Call this to resume reading after you have suspended it.
*
* @param chunkNum chunk number corresponding to the handleChunk() or
handleResponse() call from which you
+ * @return time that backpressure was applied (channel was closed for
reads)
*/
- void resume(long chunkNum);
+ long resume(long chunkNum);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
index 6f8d3c6..b332fec 100644
--- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
@@ -244,6 +244,12 @@ public class DefaultQueryMetrics<QueryType extends
Query<?>> implements QueryMet
}
@Override
+ public QueryMetrics<QueryType> reportBackPressureTime(long timeNs)
+ {
+ return reportMillisTimeMetric("query/node/backpressure", timeNs);
+ }
+
+ @Override
public QueryMetrics<QueryType> reportNodeTime(long timeNs)
{
return reportMillisTimeMetric("query/node/time", timeNs);
diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
index b8d2b55..c5e32b9 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
@@ -279,6 +279,11 @@ public interface QueryMetrics<QueryType extends Query<?>>
QueryMetrics<QueryType> reportNodeTimeToFirstByte(long timeNs);
/**
+ * Registers "time that channel is unreadable (backpressure)" metric.
+ */
+ QueryMetrics<QueryType> reportBackPressureTime(long timeNs);
+
+ /**
* Registers "node time" metric.
*/
QueryMetrics<QueryType> reportNodeTime(long timeNs);
diff --git
a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
index 96b126a..a44b004 100644
---
a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
+++
b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
@@ -209,6 +209,12 @@ public class DefaultSearchQueryMetrics implements
SearchQueryMetrics
}
@Override
+ public QueryMetrics reportBackPressureTime(long timeNs)
+ {
+ return delegateQueryMetrics.reportBackPressureTime(timeNs);
+ }
+
+ @Override
public QueryMetrics reportNodeTime(long timeNs)
{
return delegateQueryMetrics.reportNodeTime(timeNs);
diff --git
a/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
b/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
index bf59412..d2b6b60 100644
---
a/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
+++
b/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
@@ -208,6 +208,12 @@ public class DefaultSelectQueryMetrics implements
SelectQueryMetrics
}
@Override
+ public QueryMetrics reportBackPressureTime(long timeNs)
+ {
+ return delegateQueryMetrics.reportBackPressureTime(timeNs);
+ }
+
+ @Override
public QueryMetrics reportNodeTime(long timeNs)
{
return delegateQueryMetrics.reportNodeTime(timeNs);
diff --git
a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
index ac022ca..16312c6 100644
---
a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
+++
b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
@@ -152,5 +152,10 @@ public class DefaultQueryMetricsTest
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/node/bytes", actualEvent.get("metric"));
Assert.assertEquals(10L, actualEvent.get("value"));
+
+ queryMetrics.reportBackPressureTime(11000001).emit(serviceEmitter);
+ actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
+ Assert.assertEquals("query/node/backpressure", actualEvent.get("metric"));
+ Assert.assertEquals(11L, actualEvent.get("value"));
}
}
diff --git
a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index 60dfd7a..39396b8 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -203,6 +203,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
{
private final AtomicLong totalByteCount = new AtomicLong(0);
private final AtomicLong queuedByteCount = new AtomicLong(0);
+ private final AtomicLong channelSuspendedTime = new AtomicLong(0);
private final BlockingQueue<InputStreamHolder> queue = new
LinkedBlockingQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<String> fail = new AtomicReference<>();
@@ -244,8 +245,9 @@ public class DirectDruidClient<T> implements QueryRunner<T>
final long currentQueuedByteCount =
queuedByteCount.addAndGet(-holder.getLength());
if (usingBackpressure && currentQueuedByteCount < maxQueuedBytes) {
- Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop,
how can this be?")
- .resume(holder.getChunkNum());
+ long backPressureTime =
Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, how can this
be?")
+ .resume(holder.getChunkNum());
+ channelSuspendedTime.addAndGet(backPressureTime);
}
return holder.getStream();
@@ -382,6 +384,11 @@ public class DirectDruidClient<T> implements QueryRunner<T>
QueryMetrics<? super Query<T>> responseMetrics =
acquireResponseMetrics();
responseMetrics.reportNodeTime(nodeTimeNs);
responseMetrics.reportNodeBytes(totalByteCount.get());
+
+ if (usingBackpressure) {
+ responseMetrics.reportBackPressureTime(channelSuspendedTime.get());
+ }
+
responseMetrics.emit(emitter);
synchronized (done) {
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]