gianm closed pull request #6335: Added backpressure metric
URL: https://github.com/apache/incubator-druid/pull/6335
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
 
b/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
index d15e1e6aa76..4603a33ba31 100644
--- 
a/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
+++ 
b/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
@@ -72,6 +72,7 @@
   private final ResourcePool<String, ChannelFuture> pool;
   private final HttpClientConfig.CompressionCodec compressionCodec;
   private final Duration defaultReadTimeout;
+  private long backPressureStartTimeNs;
 
   NettyHttpClient(
       ResourcePool<String, ChannelFuture> pool,
@@ -212,9 +213,13 @@ public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent e)
                     if (suspendWatermark >= 0 && resumeWatermark >= 
suspendWatermark) {
                       suspendWatermark = -1;
                       channel.setReadable(true);
+                      long backPressureDuration = System.nanoTime() - 
backPressureStartTimeNs;
                       log.debug("[%s] Resumed reads from channel (chunkNum = 
%,d).", requestDesc, resumeChunkNum);
+                      return backPressureDuration;
                     }
                   }
+
+                  return 0; //If we didn't resume, don't know if backpressure 
was happening
                 };
                 response = handler.handleResponse(httpResponse, trafficCop);
                 if (response.isFinished()) {
@@ -271,6 +276,7 @@ private void possiblySuspendReads(ClientResponse<?> 
response)
                 suspendWatermark = Math.max(suspendWatermark, currentChunkNum);
                 if (suspendWatermark > resumeWatermark) {
                   channel.setReadable(false);
+                  backPressureStartTimeNs = System.nanoTime();
                   log.debug("[%s] Suspended reads from channel (chunkNum = 
%,d).", requestDesc, currentChunkNum);
                 }
               }
diff --git 
a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java
 
b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java
index 02d6caa6f2f..03e54e702e0 100644
--- 
a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java
+++ 
b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java
@@ -91,7 +91,8 @@
      * Call this to resume reading after you have suspended it.
      *
      * @param chunkNum chunk number corresponding to the handleChunk() or 
handleResponse() call from which you
+     * @return time that backpressure was applied (channel was closed for 
reads)
      */
-    void resume(long chunkNum);
+    long resume(long chunkNum);
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java 
b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
index 6f8d3c6c2b3..b332fec98df 100644
--- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
@@ -243,6 +243,12 @@ public void identity(String identity)
     return reportMillisTimeMetric("query/node/ttfb", timeNs);
   }
 
+  @Override
+  public QueryMetrics<QueryType> reportBackPressureTime(long timeNs)
+  {
+    return reportMillisTimeMetric("query/node/backpressure", timeNs);
+  }
+
   @Override
   public QueryMetrics<QueryType> reportNodeTime(long timeNs)
   {
diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java 
b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
index b8d2b558b0f..c5e32b968a3 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
@@ -278,6 +278,11 @@
    */
   QueryMetrics<QueryType> reportNodeTimeToFirstByte(long timeNs);
 
+  /**
+   * Registers "time that channel is unreadable (backpressure)" metric.
+   */
+  QueryMetrics<QueryType> reportBackPressureTime(long timeNs);
+
   /**
    * Registers "node time" metric.
    */
diff --git 
a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
 
b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
index 96b126a9be0..a44b004a91f 100644
--- 
a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
+++ 
b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
@@ -208,6 +208,12 @@ public QueryMetrics reportNodeTimeToFirstByte(long timeNs)
     return delegateQueryMetrics.reportNodeTimeToFirstByte(timeNs);
   }
 
+  @Override
+  public QueryMetrics reportBackPressureTime(long timeNs)
+  {
+    return delegateQueryMetrics.reportBackPressureTime(timeNs);
+  }
+
   @Override
   public QueryMetrics reportNodeTime(long timeNs)
   {
diff --git 
a/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
 
b/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
index bf594127d00..d2b6b604498 100644
--- 
a/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
+++ 
b/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
@@ -207,6 +207,12 @@ public QueryMetrics reportNodeTimeToFirstByte(long timeNs)
     return delegateQueryMetrics.reportNodeTimeToFirstByte(timeNs);
   }
 
+  @Override
+  public QueryMetrics reportBackPressureTime(long timeNs)
+  {
+    return delegateQueryMetrics.reportBackPressureTime(timeNs);
+  }
+
   @Override
   public QueryMetrics reportNodeTime(long timeNs)
   {
diff --git 
a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java 
b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
index ac022cacfff..16312c6ea57 100644
--- 
a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
@@ -152,5 +152,10 @@ public static void 
testQueryMetricsDefaultMetricNamesAndUnits(
     actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
     Assert.assertEquals("query/node/bytes", actualEvent.get("metric"));
     Assert.assertEquals(10L, actualEvent.get("value"));
+
+    queryMetrics.reportBackPressureTime(11000001).emit(serviceEmitter);
+    actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
+    Assert.assertEquals("query/node/backpressure", actualEvent.get("metric"));
+    Assert.assertEquals(11L, actualEvent.get("value"));
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java 
b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index 60dfd7a0c45..39396b8bd7d 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -203,6 +203,7 @@ public int getNumOpenConnections()
       {
         private final AtomicLong totalByteCount = new AtomicLong(0);
         private final AtomicLong queuedByteCount = new AtomicLong(0);
+        private final AtomicLong channelSuspendedTime = new AtomicLong(0);
         private final BlockingQueue<InputStreamHolder> queue = new 
LinkedBlockingQueue<>();
         private final AtomicBoolean done = new AtomicBoolean(false);
         private final AtomicReference<String> fail = new AtomicReference<>();
@@ -244,8 +245,9 @@ private InputStream dequeue() throws InterruptedException
 
           final long currentQueuedByteCount = 
queuedByteCount.addAndGet(-holder.getLength());
           if (usingBackpressure && currentQueuedByteCount < maxQueuedBytes) {
-            Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, 
how can this be?")
-                         .resume(holder.getChunkNum());
+            long backPressureTime = 
Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, how can this 
be?")
+                                                 .resume(holder.getChunkNum());
+            channelSuspendedTime.addAndGet(backPressureTime);
           }
 
           return holder.getStream();
@@ -382,6 +384,11 @@ public InputStream nextElement()
           QueryMetrics<? super Query<T>> responseMetrics = 
acquireResponseMetrics();
           responseMetrics.reportNodeTime(nodeTimeNs);
           responseMetrics.reportNodeBytes(totalByteCount.get());
+
+          if (usingBackpressure) {
+            responseMetrics.reportBackPressureTime(channelSuspendedTime.get());
+          }
+
           responseMetrics.emit(emitter);
           synchronized (done) {
             try {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to