kfaraz commented on code in PR #18314:
URL: https://github.com/apache/druid/pull/18314#discussion_r2235345467


##########
docs/operations/metrics.md:
##########
@@ -92,6 +92,8 @@ Most metric values reset each emission period, as specified 
in `druid.monitoring
 |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the 
disk.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
 |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy 
queries.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
 |`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This 
metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
+|`serverview/segment/added`|Number of segments added to the broker|This metric 
is only available if the `BrokerSegmentStatsMonitor` module is included.|Varies|
+|`serverview/segment/removed`|Number of segments removed from the broker|This 
metric is only available if the `BrokerSegmentStatsMonitor` module is 
included.|Varies|

Review Comment:
   ```suggestion
   |`serverview/segment/removed`|Number of segments removed by the Broker from 
its server view.|This metric is only available if the 
`BrokerSegmentStatsMonitor` module is included.|Varies|
   ```



##########
docs/operations/metrics.md:
##########
@@ -93,6 +93,9 @@ Most metric values reset each emission period, as specified 
in `druid.monitoring
 |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy 
queries.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
 |`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This 
metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
 
+|`serverview/segment/added`|Number of segments added to the broker|This metric 
is only available if the `BrokerSegmentStatsMonitor` module is included.|Varies|

Review Comment:
   Please move these next to the `serverview/sync/healthy` metric since they 
are related to that metric.



##########
server/src/main/java/org/apache/druid/client/BrokerServerView.java:
##########
@@ -436,4 +449,55 @@ public List<ImmutableDruidServer> getDruidServers()
                   .map(queryableDruidServer -> 
queryableDruidServer.getServer().toImmutableDruidServer())
                   .collect(Collectors.toList());
   }
+
+  private void incrementSegmentCount(RowKey key)
+  {
+    totalSegmentAddCount.compute(key, (k, currentValue) -> currentValue == 
null ? 1 : currentValue + 1);
+  }
+
+  @Override
+  public Map<RowKey, Long> getSegmentAddedCount()
+  {
+    final ConcurrentHashMap<RowKey, Long> total = totalSegmentAddCount;
+    synchronized (totalSegmentAddCount) {
+      final Map<RowKey, Long> delta = getDeltaValues(total, 
previousSegmentAddCount);
+      previousSegmentAddCount = total;
+      return delta;
+    }
+  }
+
+  @Override
+  public Map<RowKey, Long> getSegmentRemovedCount()
+  {
+    final ConcurrentHashMap<RowKey, Long> total = totalSegmentRemoveCount;
+    synchronized (totalSegmentRemoveCount) {
+      final Map<RowKey, Long> delta = getDeltaValues(total, 
previousSegmentRemoveCount);
+      previousSegmentRemoveCount = total;
+      return delta;
+    }
+  }
+
+  private void decrementSegmentCount(RowKey key)
+  {
+    totalSegmentRemoveCount.compute(key, (k, currentValue) -> currentValue == 
null ? 1 : currentValue + 1);
+  }
+
+  private Map<RowKey, Long> getDeltaValues(Map<RowKey, Long> total, 
Map<RowKey, Long> prev)
+  {
+    final Map<RowKey, Long> deltaValues = new HashMap<>();
+    total.forEach(
+        (dataSource, totalCount) -> deltaValues.put(
+            dataSource,
+            totalCount - prev.getOrDefault(dataSource, 0L)
+        )
+    );
+    return deltaValues;
+  }
+
+  private static RowKey getMetricKey(final DataSegment segment)
+  {
+    return RowKey.with(Dimension.DATASOURCE, segment.getDataSource())
+                 .with(Dimension.INTERVAL, 
String.valueOf(segment.getInterval()))
+                 .and(Dimension.VERSION, segment.getVersion());

Review Comment:
   Since we are now emitting an added/removed event per segment, rather than 
interval and version, we can just use the dimensions `dataSource`, `server` and 
`description` (which contains the full segment ID).



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/mariadb/EmbeddedMariaDBMetadataStoreTest.java:
##########
@@ -31,6 +32,9 @@ public class EmbeddedMariaDBMetadataStoreTest extends 
EmbeddedIndexTaskTest
   @Override
   public EmbeddedDruidCluster createCluster()
   {
+    final EmbeddedBroker broker = new EmbeddedBroker()
+        .addProperty("druid.monitoring.monitors", 
"[\"org.apache.druid.server.metrics.BrokerSegmentStatsMonitor\"]")
+        .addProperty("druid.monitoring.emissionPeriod", "PT0.1s");

Review Comment:
   We probably don't need this change since the `broker` member field in the 
super class already has the required properties. The `createCluster` method 
will just use that broker.



##########
docs/operations/metrics.md:
##########
@@ -92,6 +92,8 @@ Most metric values reset each emission period, as specified 
in `druid.monitoring
 |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the 
disk.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
 |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy 
queries.|This metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
 |`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This 
metric is only available if the `GroupByStatsMonitor` module is 
included.|Varies|
+|`serverview/segment/added`|Number of segments added to the broker|This metric 
is only available if the `BrokerSegmentStatsMonitor` module is included.|Varies|

Review Comment:
   ```suggestion
   |`serverview/segment/added`|Number of segments discovered and added by the 
Broker to its server view.|This metric is only available if the 
`BrokerSegmentStatsMonitor` module is included.|Varies|
   ```



##########
server/src/main/java/org/apache/druid/client/BrokerServerView.java:
##########
@@ -76,6 +80,12 @@ public class BrokerServerView implements TimelineServerView
   private final CountDownLatch initialized = new CountDownLatch(1);
   private final FilteredServerInventoryView baseView;
   private final BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig;
+  private final ConcurrentHashMap<RowKey, Long> totalSegmentAddCount = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<RowKey, Long> totalSegmentRemoveCount = new 
ConcurrentHashMap<>();
+  @GuardedBy("totalSegmentAddCount")
+  private Map<RowKey, Long> previousSegmentAddCount = new HashMap<>();
+  @GuardedBy("totalSegmentRemoveCount")
+  private Map<RowKey, Long> previousSegmentRemoveCount = new HashMap<>();

Review Comment:
   Rather than this, we can just keep a list of events (added, removed).
   When `getSegmentsAdded` is called, we just flush out that list.
   The count for any given row key will always be 1 if we use segment ID and 
server in the dimensions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to