LakshSingla commented on code in PR #17360:
URL: https://github.com/apache/druid/pull/17360#discussion_r1803801541


##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.query.groupby.epinephelinae.LimitedTemporaryStorage;
+
+import javax.inject.Inject;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Collects stats for group by queries like used merged buffer count, spilled 
bytes and group by resource acquisition time.
+ */
+@LazySingleton
+public class GroupByStatsProvider
+{
+  private final AtomicLong resourceAcquisitionTimeNs = new AtomicLong(0);
+  private final AtomicLong resourceAcquisitionCount = new AtomicLong(0);
+
+  private final ConcurrentLinkedQueue<LimitedTemporaryStorage> 
temporaryStorages;
+
+  @Inject

Review Comment:
   nit: We shouldn't need this



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.query.groupby.epinephelinae.LimitedTemporaryStorage;
+
+import javax.inject.Inject;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Collects stats for group by queries like used merged buffer count, spilled 
bytes and group by resource acquisition time.
+ */
+@LazySingleton
+public class GroupByStatsProvider
+{
+  private final AtomicLong resourceAcquisitionTimeNs = new AtomicLong(0);
+  private final AtomicLong resourceAcquisitionCount = new AtomicLong(0);
+
+  private final ConcurrentLinkedQueue<LimitedTemporaryStorage> 
temporaryStorages;
+
+  @Inject
+  public GroupByStatsProvider()
+  {
+    this.temporaryStorages = new ConcurrentLinkedQueue<>();
+  }
+
+  public synchronized void groupByResourceAcquisitionTimeNs(long delayNs)
+  {
+    resourceAcquisitionTimeNs.addAndGet(delayNs);
+    resourceAcquisitionCount.incrementAndGet();
+  }
+
+  public synchronized long getAndResetGroupByResourceAcquisitionStats()

Review Comment:
   This doesn't seem correct. We are reporting the average as a metric. A 
better way would be to report the sum, as well as the count. This will allow a 
better weighted-average than directly reporting the average from the monitor. 
While we are amortizing a lot of metrics by emitting it from the monitor, I 
think it's better to report count and sum separately. 
   



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.guice.annotations.Merging;
+import org.apache.druid.query.groupby.epinephelinae.LimitedTemporaryStorage;
+
+import javax.inject.Inject;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Collects stats for group by queries like used merged buffer count, spilled 
bytes and group by resource acquisition time.
+ */
+public class GroupByStatsProvider
+{
+  private final AtomicLong groupByResourceAcquisitionTimeNs = new 
AtomicLong(0);
+  private final AtomicLong groupByResourceAcquisitionCount = new AtomicLong(0);
+
+  private final BlockingPool<ByteBuffer> blockingPool;
+  private final ConcurrentLinkedQueue<LimitedTemporaryStorage> 
temporaryStorages;
+
+  @Inject
+  public GroupByStatsProvider(@Merging BlockingPool<ByteBuffer> blockingPool)
+  {
+    this.blockingPool = blockingPool;
+    this.temporaryStorages = new ConcurrentLinkedQueue<>();
+  }
+
+  public synchronized void groupByResourceAcquisitionTimeNs(long delayNs)
+  {
+    groupByResourceAcquisitionTimeNs.addAndGet(delayNs);
+    groupByResourceAcquisitionCount.incrementAndGet();
+  }
+
+  public synchronized long getAndResetGroupByResourceAcquisitionStats()
+  {
+    long average = (groupByResourceAcquisitionTimeNs.get() / 
groupByResourceAcquisitionCount.get());
+
+    groupByResourceAcquisitionTimeNs.set(0);
+    groupByResourceAcquisitionCount.set(0);
+
+    return average;
+  }
+
+  public long getAcquiredMergeBufferCount()
+  {
+    return blockingPool.getUsedBufferCount();
+  }
+
+  public void registerTemporaryStorage(LimitedTemporaryStorage 
temporaryStorage)
+  {
+    temporaryStorages.add(temporaryStorage);
+  }
+
+  public long getSpilledBytes()

Review Comment:
   Regardless of this, I think that there should be metric (either this one 
after repurposing or a new one) which indicates the total size of the spilled 
data, per query. This will allow the admins to estimate whether the queries 
need a larger merge buffer and by how much. WDYT? 



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.query.groupby.epinephelinae.LimitedTemporaryStorage;
+
+import javax.inject.Inject;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Collects stats for group by queries like used merged buffer count, spilled 
bytes and group by resource acquisition time.
+ */
+@LazySingleton
+public class GroupByStatsProvider
+{
+  private final AtomicLong resourceAcquisitionTimeNs = new AtomicLong(0);
+  private final AtomicLong resourceAcquisitionCount = new AtomicLong(0);

Review Comment:
   ```suggestion
     private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0);
     private final AtomicLong mergeBufferAcquisitionCount = new AtomicLong(0);
   ```



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.guice.annotations.Merging;
+import org.apache.druid.query.groupby.epinephelinae.LimitedTemporaryStorage;
+
+import javax.inject.Inject;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Collects stats for group by queries like used merged buffer count, spilled 
bytes and group by resource acquisition time.
+ */
+public class GroupByStatsProvider
+{
+  private final AtomicLong groupByResourceAcquisitionTimeNs = new 
AtomicLong(0);
+  private final AtomicLong groupByResourceAcquisitionCount = new AtomicLong(0);
+
+  private final BlockingPool<ByteBuffer> blockingPool;
+  private final ConcurrentLinkedQueue<LimitedTemporaryStorage> 
temporaryStorages;
+
+  @Inject
+  public GroupByStatsProvider(@Merging BlockingPool<ByteBuffer> blockingPool)
+  {
+    this.blockingPool = blockingPool;
+    this.temporaryStorages = new ConcurrentLinkedQueue<>();
+  }
+
+  public synchronized void groupByResourceAcquisitionTimeNs(long delayNs)
+  {
+    groupByResourceAcquisitionTimeNs.addAndGet(delayNs);
+    groupByResourceAcquisitionCount.incrementAndGet();
+  }
+
+  public synchronized long getAndResetGroupByResourceAcquisitionStats()
+  {
+    long average = (groupByResourceAcquisitionTimeNs.get() / 
groupByResourceAcquisitionCount.get());
+
+    groupByResourceAcquisitionTimeNs.set(0);
+    groupByResourceAcquisitionCount.set(0);
+
+    return average;
+  }
+
+  public long getAcquiredMergeBufferCount()
+  {
+    return blockingPool.getUsedBufferCount();
+  }
+
+  public void registerTemporaryStorage(LimitedTemporaryStorage 
temporaryStorage)
+  {
+    temporaryStorages.add(temporaryStorage);
+  }
+
+  public long getSpilledBytes()

Review Comment:
   What if the registered "limited temporary storage" is not closed between two 
invocations of the `getSpilledBytes()`. Would the code double-count the stored 
bytes? Perhaps that is the intended behavior. If that's the case, we should 
figure out the intent behind the metric, and take a call on whether we want to 
do that or not. 



##########
processing/src/main/java/org/apache/druid/collections/BlockingPool.java:
##########
@@ -49,4 +49,9 @@ public interface BlockingPool<T>
    * @return count of pending requests
    */
   long getPendingRequests();
+
+  /**
+   * @return number of used buffers from the pool
+   */
+  long getUsedBufferCount();

Review Comment:
   BlockingPool is generic. It shouldn't reference buffers. 
   ```suggestion
     long getUsedResourcesCount();
   ```



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.query.groupby.epinephelinae.LimitedTemporaryStorage;
+
+import javax.inject.Inject;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Collects stats for group by queries like used merged buffer count, spilled 
bytes and group by resource acquisition time.
+ */
+@LazySingleton
+public class GroupByStatsProvider
+{
+  private final AtomicLong resourceAcquisitionTimeNs = new AtomicLong(0);
+  private final AtomicLong resourceAcquisitionCount = new AtomicLong(0);
+
+  private final ConcurrentLinkedQueue<LimitedTemporaryStorage> 
temporaryStorages;
+
+  @Inject
+  public GroupByStatsProvider()
+  {
+    this.temporaryStorages = new ConcurrentLinkedQueue<>();
+  }
+
+  public synchronized void groupByResourceAcquisitionTimeNs(long delayNs)
+  {
+    resourceAcquisitionTimeNs.addAndGet(delayNs);
+    resourceAcquisitionCount.incrementAndGet();
+  }
+
+  public synchronized long getAndResetGroupByResourceAcquisitionStats()
+  {
+    long average = resourceAcquisitionCount.get() != 0 ?
+                   (resourceAcquisitionTimeNs.get() / 
resourceAcquisitionCount.get()) : 0;
+
+    resourceAcquisitionTimeNs.set(0);
+    resourceAcquisitionCount.set(0);
+
+    return average;
+  }
+
+  public void registerTemporaryStorage(LimitedTemporaryStorage 
temporaryStorage)

Review Comment:
   How often does this method need to be called per group by query, and what 
does this rely upon? I feel that there's an inbuilt expense with tracking the 
spilled bytes this way. I think it is still fine, but I wonder if a more 
efficient way would be as follows:
   
   1. Have a running counter of the size occupied 
   2. LimitedTemporaryStorage implements closeable. Modify wherever the 
LimitedTemporaryStorage is getting closed to subtract the value of the storage 
from the counter. There can be a set inclusion check where a hashset of the 
opened temporary storages being tracked by the running counter are stored, and 
the code confirms that the removed temporary storage was indeed being tracked 
by the counter. 
   
   Note: My approach seems more convoluted, so if there isn't much performance 
downside of the current version, I think its fine as is. 



##########
docs/configuration/index.md:
##########
@@ -401,7 +401,7 @@ Metric monitoring is an essential part of Druid operations. 
The following monito
 |`org.apache.druid.java.util.metrics.CgroupV2MemoryMonitor`| **EXPERIMENTAL** 
Reports memory usage from `memory.current` and `memory.max` files. Only 
applicable to `cgroupv2`.|
 |`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics 
on Historical services. Available only on Historical services.|
 |`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** 
Reports statistics about segments on Historical services. Available only on 
Historical services. Not to be used when lazy loading is configured.|
-|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many 
queries have been successful/failed/interrupted.|
+|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many 
queries have been successful/failed/interrupted. It also reports stats for 
group by queries.|

Review Comment:
   Should it be made as a separate monitor? I don't see any downside of merging 
the QueryCountStatsMonitor with the group by statistics, but at the same time, 
I don't think there's any benefit to it.
   However, merging group-by statistics with "query count stats" feels 
incongruent to me. 



##########
docs/configuration/index.md:
##########
@@ -401,7 +401,7 @@ Metric monitoring is an essential part of Druid operations. 
The following monito
 |`org.apache.druid.java.util.metrics.CgroupV2MemoryMonitor`| **EXPERIMENTAL** 
Reports memory usage from `memory.current` and `memory.max` files. Only 
applicable to `cgroupv2`.|
 |`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics 
on Historical services. Available only on Historical services.|
 |`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** 
Reports statistics about segments on Historical services. Available only on 
Historical services. Not to be used when lazy loading is configured.|
-|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many 
queries have been successful/failed/interrupted.|
+|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many 
queries have been successful/failed/interrupted. It also reports stats for 
group by queries.|

Review Comment:
   ```suggestion
   |`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many 
queries have been successful/failed/interrupted. It also reports statistics for 
the group by queries.|
   ```



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.query.groupby.epinephelinae.LimitedTemporaryStorage;
+
+import javax.inject.Inject;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Collects stats for group by queries like used merged buffer count, spilled 
bytes and group by resource acquisition time.
+ */
+@LazySingleton
+public class GroupByStatsProvider
+{
+  private final AtomicLong resourceAcquisitionTimeNs = new AtomicLong(0);
+  private final AtomicLong resourceAcquisitionCount = new AtomicLong(0);

Review Comment:
   Making them atomic seems redundant given that both need to be updated 
together, and guaranteeing atomicity over a single variable is not useful. The 
current code does not rely on the `AtomicLong` part of the variables since they 
are already guarded by the provider's lock. 



##########
processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java:
##########
@@ -119,7 +119,7 @@ public List<ReferenceCountingResourceHolder<T>> 
takeBatch(final int elementNum)
       throw new RuntimeException(e);
     }
     finally {
-      pendingRequests.incrementAndGet();
+      pendingRequests.decrementAndGet();

Review Comment:
   nice catch 💯 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to