kfaraz commented on code in PR #18314:
URL: https://github.com/apache/druid/pull/18314#discussion_r2228259208


##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java:
##########
@@ -176,8 +178,10 @@ void setUpEach()
                       .hasDimension(DruidMetrics.DATASOURCE, 
Collections.singletonList(dataSource)),
         agg -> agg.hasSumAtLeast(totalRows)
     );
-    broker.latchableEmitter().waitForEvent(
-        event -> event.hasDimension(DruidMetrics.DATASOURCE, dataSource)
+    broker.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/available/count")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasCountAtLeast(10)

Review Comment:
   I don't think waiting for the count aggregate is very useful. By invoking 
`agg.hasCountAtLeast(10)`, we are essentially waiting for the above metric to 
have been emitted 10 times (with any value or dimensions) since the broker 
started.
   
   We need to identify all the intervals for which we expect segments to be 
available, and wait for each of them one by one with the expected count.
   Given that this test is using the dataset 
`wikiticker-2015-09-12-sampled.json.gz` and the Kafka supervisor is set to DAY 
granularity, I should expect there to be only 1 segment.
   But you can double check by putting a debug point in the test and checking 
the list of segments in the web-console.
   
   So the wait condition might become something like:
   
   ```java
   broker.latchableEmitter().waitForEvent(
           event -> event.hasMetricName("segment/available/count")
                         .hasDimension(DruidMetrics.DATASOURCE, dataSource)
                         .hasDimension(DruidMetrics.INTERVAL, "2015-09-12")
                         .hasValue(1)
   );
   ```



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java:
##########
@@ -113,6 +113,7 @@ public void stop()
             .addProperty("druid.manager.segments.killUnused.bufferPeriod", 
"PT0.1s")
             .addProperty("druid.manager.segments.killUnused.dutyPeriod", 
"PT1s");
     coordinator.addProperty("druid.manager.segments.useIncrementalCache", 
"ifSynced");
+    broker.addProperty("druid.monitoring.monitors", 
"[\"org.apache.druid.server.metrics.BrokerSegmentCountStatsMonitor\"]");

Review Comment:
   Is this needed here?



##########
server/src/main/java/org/apache/druid/client/BrokerServerView.java:
##########
@@ -436,4 +446,42 @@ public List<ImmutableDruidServer> getDruidServers()
                   .map(queryableDruidServer -> 
queryableDruidServer.getServer().toImmutableDruidServer())
                   .collect(Collectors.toList());
   }
+
+  @Override
+  public Map<RowKey, Long> getAvailableSegmentCount()
+  {
+    return CollectionUtils.mapValues(segmentAvailableCount, AtomicLong::get);
+  }
+
+  private static long incrementAndGetLong(ConcurrentHashMap<RowKey, 
AtomicLong> counters, RowKey key)
+  {
+    AtomicLong counter = counters.get(key);
+    if (counter == null) {
+      counter = counters.computeIfAbsent(key, k -> new AtomicLong());
+    }
+    return counter.incrementAndGet();
+  }
+
+  private static long decrementAndGetLong(ConcurrentHashMap<RowKey, 
AtomicLong> counters, RowKey key)
+  {
+    AtomicLong counter = counters.get(key);
+    long cnt = 0L;
+    if (counter != null) {
+      cnt = counter.decrementAndGet();
+      if (cnt == 0) {
+        counters.remove(key, counter);
+      }
+    }
+    return cnt;
+  }
+
+  private static RowKey getMetricKey(final DataSegment segment)
+  {
+    if (segment == null) {
+      return RowKey.empty();

Review Comment:
   Do we ever need to call this method with a `null` segment?
   If not, we should either throw a `DruidException.defensive` or just not have 
a null check at all.
   



##########
server/src/main/java/org/apache/druid/client/BrokerServerView.java:
##########
@@ -436,4 +446,42 @@ public List<ImmutableDruidServer> getDruidServers()
                   .map(queryableDruidServer -> 
queryableDruidServer.getServer().toImmutableDruidServer())
                   .collect(Collectors.toList());
   }
+
+  @Override
+  public Map<RowKey, Long> getAvailableSegmentCount()
+  {
+    return CollectionUtils.mapValues(segmentAvailableCount, AtomicLong::get);
+  }
+
+  private static long incrementAndGetLong(ConcurrentHashMap<RowKey, 
AtomicLong> counters, RowKey key)

Review Comment:
   Any specific reason this method is static?
   
   If not, then maybe we can simplify it as follows:
   ```suggestion
     private void incrementSegmentCount(RowKey key)
   ```
   
   Same for the decrement method.



##########
server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class BrokerSegmentCountStatsMonitorTest
+{
+  private BrokerSegmentCountStatsProvider statsProvider;
+  private static final RowKey SEGMENT_METRIC_KEY1 = 
RowKey.with(Dimension.DATASOURCE, "dataSource1")
+                                                          
.with(Dimension.VERSION, "2024-01-01T00:00:00.000Z")
+                                                          
.with(Dimension.INTERVAL, "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z")
+                                                          .build();
+  private static final RowKey SEGMENT_METRIC_KEY2 = 
RowKey.with(Dimension.DATASOURCE, "dataSource2")
+                                                          
.with(Dimension.VERSION, "2024-01-02T00:00:00.000Z")
+                                                          
.with(Dimension.INTERVAL, "2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z")
+                                                          .build();
+
+  @Before
+  public void setUp()
+  {
+    statsProvider = new BrokerSegmentCountStatsProvider()
+    {
+      @Override
+      public Map<RowKey, Long> getAvailableSegmentCount()
+      {
+        return ImmutableMap.of(SEGMENT_METRIC_KEY1, 10L, SEGMENT_METRIC_KEY2, 
5L);

Review Comment:
   ```suggestion
           return Map.of(SEGMENT_METRIC_KEY1, 10L, SEGMENT_METRIC_KEY2, 5L);
   ```



##########
server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class BrokerSegmentCountStatsMonitorTest
+{
+  private BrokerSegmentCountStatsProvider statsProvider;
+  private static final RowKey SEGMENT_METRIC_KEY1 = 
RowKey.with(Dimension.DATASOURCE, "dataSource1")
+                                                          
.with(Dimension.VERSION, "2024-01-01T00:00:00.000Z")
+                                                          
.with(Dimension.INTERVAL, "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z")
+                                                          .build();
+  private static final RowKey SEGMENT_METRIC_KEY2 = 
RowKey.with(Dimension.DATASOURCE, "dataSource2")
+                                                          
.with(Dimension.VERSION, "2024-01-02T00:00:00.000Z")
+                                                          
.with(Dimension.INTERVAL, "2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z")
+                                                          .build();
+
+  @Before
+  public void setUp()
+  {
+    statsProvider = new BrokerSegmentCountStatsProvider()
+    {
+      @Override
+      public Map<RowKey, Long> getAvailableSegmentCount()
+      {
+        return ImmutableMap.of(SEGMENT_METRIC_KEY1, 10L, SEGMENT_METRIC_KEY2, 
5L);
+      }
+    };
+  }
+
+  @Test
+  public void testMonitor()
+  {
+    final BrokerSegmentCountStatsMonitor monitor = new 
BrokerSegmentCountStatsMonitor(statsProvider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    Assert.assertTrue(monitor.doMonitor(emitter));
+
+    Assert.assertEquals(2, emitter.getNumEmittedEvents());
+
+    emitter.verifyValue("segment/available/count", Map.of("dataSource", 
"dataSource1", "version", "2024-01-01T00:00:00.000Z", "interval", 
"2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z"), 10L);
+    emitter.verifyValue("segment/available/count", Map.of("dataSource", 
"dataSource2", "version", "2024-01-02T00:00:00.000Z", "interval", 
"2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z"), 5L);
+  }
+
+  @Test
+  public void testMonitorWithNullCounts()
+  {
+    final BrokerSegmentCountStatsProvider nullStatsProvider = new 
BrokerSegmentCountStatsProvider()
+    {
+      @Override
+      public Map<RowKey, Long> getAvailableSegmentCount()
+      {
+        return null;
+      }
+    };
+
+    final BrokerSegmentCountStatsMonitor monitor = new 
BrokerSegmentCountStatsMonitor(nullStatsProvider);

Review Comment:
   ```suggestion
       final BrokerSegmentCountStatsMonitor monitor = new 
BrokerSegmentCountStatsMonitor(() -> null);
   ```



##########
server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java:
##########
@@ -64,12 +66,16 @@
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 
+import static org.apache.druid.server.coordinator.stats.Dimension.INTERVAL;

Review Comment:
   Please remove this import.



##########
server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class BrokerSegmentCountStatsMonitorTest
+{
+  private BrokerSegmentCountStatsProvider statsProvider;
+  private static final RowKey SEGMENT_METRIC_KEY1 = 
RowKey.with(Dimension.DATASOURCE, "dataSource1")
+                                                          
.with(Dimension.VERSION, "2024-01-01T00:00:00.000Z")
+                                                          
.with(Dimension.INTERVAL, "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z")
+                                                          .build();
+  private static final RowKey SEGMENT_METRIC_KEY2 = 
RowKey.with(Dimension.DATASOURCE, "dataSource2")
+                                                          
.with(Dimension.VERSION, "2024-01-02T00:00:00.000Z")
+                                                          
.with(Dimension.INTERVAL, "2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z")
+                                                          .build();
+
+  @Before
+  public void setUp()
+  {
+    statsProvider = new BrokerSegmentCountStatsProvider()

Review Comment:
   Can be a lambda.



##########
server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitor.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.inject.Inject;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.server.coordinator.stats.RowKey;
+
+import java.util.Map;
+
+/**
+ * {@Code BrokerSegmentCountStatsMonitor} tracks the currently available 
segments that broker has discovered.

Review Comment:
   ```suggestion
    * Monitor that tracks the number of segments of a datasource currently 
queryable by this Broker.
   ```



##########
server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java:
##########
@@ -194,6 +212,18 @@ public void testMultipleServerAddedRemovedSegment() throws 
Exception
             )
         )
     );
+    Map<RowKey, Long> availableSegmentCount = 
brokerServerView.getAvailableSegmentCount();
+    Map<RowKey, Long> expectedSegmentCount = new HashMap<>();
+    for (DataSegment segment : segments) {
+      expectedSegmentCount.put(
+          RowKey.with(Dimension.DATASOURCE, segment.getDataSource())
+                .with(INTERVAL, segment.getInterval().toString())

Review Comment:
   ```suggestion
                   .with(Dimension.INTERVAL, segment.getInterval().toString())
   ```



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java:
##########
@@ -119,6 +123,10 @@ public void test_switchLeader_andVerifyUsingSysTables()
     // Run sys queries, switch leaders, repeat
     ServerPair<EmbeddedOverlord> overlordPair = createServerPair(overlord1, 
overlord2);
     ServerPair<EmbeddedCoordinator> coordinatorPair = 
createServerPair(coordinator1, coordinator2);
+    broker.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName("segment/available/count")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource)
+    );

Review Comment:
   Probably cleaner to move this line right after `waitForTaskToSucceed` on 
line 121.
   The total number of segments expected to queryable is 10 across 10 intervals.



##########
server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class BrokerSegmentCountStatsMonitorTest
+{
+  private BrokerSegmentCountStatsProvider statsProvider;
+  private static final RowKey SEGMENT_METRIC_KEY1 = 
RowKey.with(Dimension.DATASOURCE, "dataSource1")
+                                                          
.with(Dimension.VERSION, "2024-01-01T00:00:00.000Z")
+                                                          
.with(Dimension.INTERVAL, "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z")
+                                                          .build();
+  private static final RowKey SEGMENT_METRIC_KEY2 = 
RowKey.with(Dimension.DATASOURCE, "dataSource2")
+                                                          
.with(Dimension.VERSION, "2024-01-02T00:00:00.000Z")
+                                                          
.with(Dimension.INTERVAL, "2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z")
+                                                          .build();
+
+  @Before
+  public void setUp()
+  {
+    statsProvider = new BrokerSegmentCountStatsProvider()
+    {
+      @Override
+      public Map<RowKey, Long> getAvailableSegmentCount()
+      {
+        return ImmutableMap.of(SEGMENT_METRIC_KEY1, 10L, SEGMENT_METRIC_KEY2, 
5L);
+      }
+    };
+  }
+
+  @Test
+  public void testMonitor()
+  {
+    final BrokerSegmentCountStatsMonitor monitor = new 
BrokerSegmentCountStatsMonitor(statsProvider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    Assert.assertTrue(monitor.doMonitor(emitter));
+
+    Assert.assertEquals(2, emitter.getNumEmittedEvents());
+
+    emitter.verifyValue("segment/available/count", Map.of("dataSource", 
"dataSource1", "version", "2024-01-01T00:00:00.000Z", "interval", 
"2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z"), 10L);
+    emitter.verifyValue("segment/available/count", Map.of("dataSource", 
"dataSource2", "version", "2024-01-02T00:00:00.000Z", "interval", 
"2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z"), 5L);
+  }
+
+  @Test
+  public void testMonitorWithNullCounts()
+  {
+    final BrokerSegmentCountStatsProvider nullStatsProvider = new 
BrokerSegmentCountStatsProvider()
+    {
+      @Override
+      public Map<RowKey, Long> getAvailableSegmentCount()
+      {
+        return null;
+      }
+    };
+
+    final BrokerSegmentCountStatsMonitor monitor = new 
BrokerSegmentCountStatsMonitor(nullStatsProvider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    Assert.assertTrue(monitor.doMonitor(emitter));
+
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
+  }
+
+  @Test
+  public void testMonitorWithEmptyCounts()

Review Comment:
   Nit: method name style for better readability in tests:
   
   ```suggestion
     public void test_monitor_withEmptyCounts()
   ```



##########
server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsProvider.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import org.apache.druid.server.coordinator.stats.RowKey;
+
+import java.util.Map;
+
+public interface BrokerSegmentCountStatsProvider
+{
+  /**
+   * Return the number of available segments discovered by broker for a 
datasource.

Review Comment:
   ```suggestion
      * Return the number of segments queryable by a Broker for a datasource, 
interval and version.
   ```



##########
server/src/main/java/org/apache/druid/client/BrokerServerView.java:
##########
@@ -436,4 +446,42 @@ public List<ImmutableDruidServer> getDruidServers()
                   .map(queryableDruidServer -> 
queryableDruidServer.getServer().toImmutableDruidServer())
                   .collect(Collectors.toList());
   }
+
+  @Override
+  public Map<RowKey, Long> getAvailableSegmentCount()
+  {
+    return CollectionUtils.mapValues(segmentAvailableCount, AtomicLong::get);
+  }
+
+  private static long incrementAndGetLong(ConcurrentHashMap<RowKey, 
AtomicLong> counters, RowKey key)
+  {
+    AtomicLong counter = counters.get(key);
+    if (counter == null) {
+      counter = counters.computeIfAbsent(key, k -> new AtomicLong());
+    }
+    return counter.incrementAndGet();

Review Comment:
   Since the map is already a concurrent hash map, we can simplify the code to 
just be:
   
   ```suggestion
       segmentAvailableCount.compute(key, (k, currentValue) -> currentValue == 
null ? 1 : currentValue + 1);
   ```



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java:
##########
@@ -108,7 +110,8 @@ public void test_runIndexTask_forInlineDatasource()
         agg -> agg.hasSumAtLeast(10)
     );
     broker.latchableEmitter().waitForEvent(
-        event -> event.hasDimension(DruidMetrics.DATASOURCE, dataSource)
+        event -> event.hasMetricName("segment/available/count")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource)

Review Comment:
   The number of segments expected to be queryable is 10 (across 10 different 
days).



##########
server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class BrokerSegmentCountStatsMonitorTest
+{
+  private BrokerSegmentCountStatsProvider statsProvider;
+  private static final RowKey SEGMENT_METRIC_KEY1 = 
RowKey.with(Dimension.DATASOURCE, "dataSource1")
+                                                          
.with(Dimension.VERSION, "2024-01-01T00:00:00.000Z")
+                                                          
.with(Dimension.INTERVAL, "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z")
+                                                          .build();
+  private static final RowKey SEGMENT_METRIC_KEY2 = 
RowKey.with(Dimension.DATASOURCE, "dataSource2")
+                                                          
.with(Dimension.VERSION, "2024-01-02T00:00:00.000Z")
+                                                          
.with(Dimension.INTERVAL, "2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z")
+                                                          .build();
+
+  @Before
+  public void setUp()
+  {
+    statsProvider = new BrokerSegmentCountStatsProvider()
+    {
+      @Override
+      public Map<RowKey, Long> getAvailableSegmentCount()
+      {
+        return ImmutableMap.of(SEGMENT_METRIC_KEY1, 10L, SEGMENT_METRIC_KEY2, 
5L);
+      }
+    };
+  }
+
+  @Test
+  public void testMonitor()
+  {
+    final BrokerSegmentCountStatsMonitor monitor = new 
BrokerSegmentCountStatsMonitor(statsProvider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");

Review Comment:
   ```suggestion
       final StubServiceEmitter emitter = new StubServiceEmitter();
   ```



##########
server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class BrokerSegmentCountStatsMonitorTest
+{
+  private BrokerSegmentCountStatsProvider statsProvider;
+  private static final RowKey SEGMENT_METRIC_KEY1 = 
RowKey.with(Dimension.DATASOURCE, "dataSource1")
+                                                          
.with(Dimension.VERSION, "2024-01-01T00:00:00.000Z")
+                                                          
.with(Dimension.INTERVAL, "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z")
+                                                          .build();
+  private static final RowKey SEGMENT_METRIC_KEY2 = 
RowKey.with(Dimension.DATASOURCE, "dataSource2")
+                                                          
.with(Dimension.VERSION, "2024-01-02T00:00:00.000Z")
+                                                          
.with(Dimension.INTERVAL, "2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z")
+                                                          .build();
+
+  @Before
+  public void setUp()
+  {
+    statsProvider = new BrokerSegmentCountStatsProvider()
+    {
+      @Override
+      public Map<RowKey, Long> getAvailableSegmentCount()
+      {
+        return ImmutableMap.of(SEGMENT_METRIC_KEY1, 10L, SEGMENT_METRIC_KEY2, 
5L);
+      }
+    };
+  }
+
+  @Test
+  public void testMonitor()
+  {
+    final BrokerSegmentCountStatsMonitor monitor = new 
BrokerSegmentCountStatsMonitor(statsProvider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    Assert.assertTrue(monitor.doMonitor(emitter));
+
+    Assert.assertEquals(2, emitter.getNumEmittedEvents());
+
+    emitter.verifyValue("segment/available/count", Map.of("dataSource", 
"dataSource1", "version", "2024-01-01T00:00:00.000Z", "interval", 
"2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z"), 10L);
+    emitter.verifyValue("segment/available/count", Map.of("dataSource", 
"dataSource2", "version", "2024-01-02T00:00:00.000Z", "interval", 
"2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z"), 5L);
+  }
+
+  @Test
+  public void testMonitorWithNullCounts()
+  {
+    final BrokerSegmentCountStatsProvider nullStatsProvider = new 
BrokerSegmentCountStatsProvider()
+    {
+      @Override
+      public Map<RowKey, Long> getAvailableSegmentCount()
+      {
+        return null;
+      }
+    };
+
+    final BrokerSegmentCountStatsMonitor monitor = new 
BrokerSegmentCountStatsMonitor(nullStatsProvider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    Assert.assertTrue(monitor.doMonitor(emitter));
+
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
+  }
+
+  @Test
+  public void testMonitorWithEmptyCounts()
+  {
+    final BrokerSegmentCountStatsProvider emptyStatsProvider = new 
BrokerSegmentCountStatsProvider()
+    {
+      @Override
+      public Map<RowKey, Long> getAvailableSegmentCount()
+      {
+        return ImmutableMap.of();
+      }
+    };
+
+    final BrokerSegmentCountStatsMonitor monitor = new 
BrokerSegmentCountStatsMonitor(emptyStatsProvider);

Review Comment:
   ```suggestion
       final BrokerSegmentCountStatsMonitor monitor = new 
BrokerSegmentCountStatsMonitor(() -> Map.of());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to