kfaraz commented on code in PR #18195:
URL: https://github.com/apache/druid/pull/18195#discussion_r2194782494


##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -287,6 +294,93 @@ public Map<String, LookupExtractorFactoryContainer> 
fetchLookupsForTierSync(Stri
     }
   }
 
+  @Override
+  public ListenableFuture<CloseableIterator<SegmentStatusInCluster>> 
getAllUsedSegments(
+      @Nullable Set<String> watchedDataSources,
+      Boolean includeOvershadowedStatus,
+      Boolean includeRealtimeSegments
+  )
+  {
+    final StringBuilder pathBuilder = new StringBuilder(
+        "/druid/coordinator/v1/metadata/segments");

Review Comment:
   ```suggestion
       final StringBuilder pathBuilder = new 
StringBuilder("/druid/coordinator/v1/metadata/segments");
   ```



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -287,6 +294,93 @@ public Map<String, LookupExtractorFactoryContainer> 
fetchLookupsForTierSync(Stri
     }
   }
 
+  @Override
+  public ListenableFuture<CloseableIterator<SegmentStatusInCluster>> 
getAllUsedSegments(
+      @Nullable Set<String> watchedDataSources,
+      Boolean includeOvershadowedStatus,
+      Boolean includeRealtimeSegments
+  )
+  {
+    final StringBuilder pathBuilder = new StringBuilder(
+        "/druid/coordinator/v1/metadata/segments");
+
+    List<String> params = new ArrayList<>();

Review Comment:
   ```suggestion
       final List<String> params = new ArrayList<>();
   ```



##########
server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java:
##########
@@ -531,4 +553,300 @@ public void 
test_fetchLookupsForTierSync_detailedEnabled() throws Exception
         coordinatorClient.fetchLookupsForTierSync("default_tier")
     );
   }
+
+  @Test
+  public void test_getAllUsedSegments() throws JsonProcessingException
+  {
+    final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, 
SEGMENT3);
+
+    serviceClient.expectAndRespond(
+        new RequestBuilder(
+            HttpMethod.GET,
+            
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments"
+        ),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(segments)
+    );
+
+    CloseableIterator<SegmentStatusInCluster> iterator = 
FutureUtils.getUnchecked(
+        coordinatorClient.getAllUsedSegments(null, true, true),
+        true
+    );
+    List<SegmentStatusInCluster> actualSegments = new ArrayList<>();
+    while (iterator.hasNext()) {
+      actualSegments.add(iterator.next());
+    }
+    Assert.assertEquals(
+        segments,
+        actualSegments.stream()
+                      .map(SegmentStatusInCluster::getDataSegment)
+                      .collect(ImmutableList.toImmutableList())
+    );
+  }
+
+  @Test
+  public void test_getAllUsedSegments_noParams() throws JsonProcessingException

Review Comment:
   We should have a test case for multiple datasources as well.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedCoordinatorClientTest.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.SegmentDescriptor;
+import 
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentStatusInCluster;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class EmbeddedCoordinatorClientTest extends EmbeddedClusterTestBase
+{
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedBroker broker = new EmbeddedBroker();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedHistorical historical = new EmbeddedHistorical();
+
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+    overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");

Review Comment:
   This shouldn't be needed either.



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -287,6 +294,93 @@ public Map<String, LookupExtractorFactoryContainer> 
fetchLookupsForTierSync(Stri
     }
   }
 
+  @Override
+  public ListenableFuture<CloseableIterator<SegmentStatusInCluster>> 
getAllUsedSegments(
+      @Nullable Set<String> watchedDataSources,
+      Boolean includeOvershadowedStatus,
+      Boolean includeRealtimeSegments
+  )
+  {
+    final StringBuilder pathBuilder = new StringBuilder(
+        "/druid/coordinator/v1/metadata/segments");
+
+    List<String> params = new ArrayList<>();
+    if (includeOvershadowedStatus != null && includeOvershadowedStatus) {

Review Comment:
   Once we start accepting primitive `boolean` instead of the boxed `Boolean`
   ```suggestion
       if (includeOvershadowedStatus) {
   ```



##########
server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java:
##########
@@ -531,4 +553,300 @@ public void 
test_fetchLookupsForTierSync_detailedEnabled() throws Exception
         coordinatorClient.fetchLookupsForTierSync("default_tier")
     );
   }
+
+  @Test
+  public void test_getAllUsedSegments() throws JsonProcessingException
+  {
+    final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, 
SEGMENT3);
+
+    serviceClient.expectAndRespond(
+        new RequestBuilder(
+            HttpMethod.GET,
+            
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments"
+        ),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(segments)
+    );
+
+    CloseableIterator<SegmentStatusInCluster> iterator = 
FutureUtils.getUnchecked(
+        coordinatorClient.getAllUsedSegments(null, true, true),
+        true
+    );
+    List<SegmentStatusInCluster> actualSegments = new ArrayList<>();
+    while (iterator.hasNext()) {
+      actualSegments.add(iterator.next());
+    }
+    Assert.assertEquals(
+        segments,
+        actualSegments.stream()
+                      .map(SegmentStatusInCluster::getDataSegment)
+                      .collect(ImmutableList.toImmutableList())
+    );
+  }
+
+  @Test
+  public void test_getAllUsedSegments_noParams() throws JsonProcessingException
+  {
+    final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, 
SEGMENT3);
+
+    serviceClient.expectAndRespond(
+        new RequestBuilder(
+            HttpMethod.GET,
+            "/druid/coordinator/v1/metadata/segments"
+        ),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(segments)
+    );
+
+    CloseableIterator<SegmentStatusInCluster> iterator = 
FutureUtils.getUnchecked(
+        coordinatorClient.getAllUsedSegments(null, false, false),
+        true
+    );
+    List<SegmentStatusInCluster> actualSegments = new ArrayList<>();
+    while (iterator.hasNext()) {
+      actualSegments.add(iterator.next());
+    }
+    Assert.assertEquals(
+        segments,
+        actualSegments.stream()
+                      .map(SegmentStatusInCluster::getDataSegment)
+                      .collect(ImmutableList.toImmutableList())
+    );
+  }
+
+  @Test
+  public void test_getAllUsedSegments_excludeRealtimeSegments() throws 
JsonProcessingException
+  {
+    final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, 
SEGMENT3);
+
+    serviceClient.expectAndRespond(
+        new RequestBuilder(
+            HttpMethod.GET,
+            "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus"
+        ),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(segments)
+    );
+
+    CloseableIterator<SegmentStatusInCluster> iterator = 
FutureUtils.getUnchecked(
+        coordinatorClient.getAllUsedSegments(null, true, false),
+        true
+    );
+    List<SegmentStatusInCluster> actualSegments = new ArrayList<>();
+    while (iterator.hasNext()) {
+      actualSegments.add(iterator.next());
+    }
+    Assert.assertEquals(
+        segments,
+        actualSegments.stream()
+                      .map(SegmentStatusInCluster::getDataSegment)
+                      .collect(ImmutableList.toImmutableList())
+    );
+  }
+
+  @Test
+  public void test_getAllUsedSegments_filterByDataSource() throws Exception
+  {
+    serviceClient.expectAndRespond(
+        new RequestBuilder(
+            HttpMethod.GET,
+            
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments&datasources=abc"
+        ),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(ImmutableList.of(SEGMENT3))
+    );
+
+    CloseableIterator<SegmentStatusInCluster> iterator = 
FutureUtils.getUnchecked(
+        coordinatorClient.getAllUsedSegments(Collections.singleton("abc"), 
true, true),
+        true
+    );
+
+    List<SegmentStatusInCluster> actualSegments = new ArrayList<>();
+    while (iterator.hasNext()) {
+      actualSegments.add(iterator.next());
+    }
+    Assert.assertEquals(
+        ImmutableList.of(SEGMENT3),
+        actualSegments.stream()
+                      .map(SegmentStatusInCluster::getDataSegment)
+                      .collect(ImmutableList.toImmutableList())
+    );
+  }
+
+  @Test
+  public void test_getAllUsedSegments_filterByDataSourceOnly() throws Exception
+  {
+    serviceClient.expectAndRespond(
+        new RequestBuilder(
+            HttpMethod.GET,
+            "/druid/coordinator/v1/metadata/segments?datasources=abc"
+        ),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(ImmutableList.of(SEGMENT3))
+    );
+
+    CloseableIterator<SegmentStatusInCluster> iterator = 
FutureUtils.getUnchecked(
+        coordinatorClient.getAllUsedSegments(Collections.singleton("abc"), 
false, false),
+        true
+    );
+
+    List<SegmentStatusInCluster> actualSegments = new ArrayList<>();
+    while (iterator.hasNext()) {
+      actualSegments.add(iterator.next());
+    }
+    Assert.assertEquals(
+        ImmutableList.of(SEGMENT3),
+        actualSegments.stream()
+                      .map(SegmentStatusInCluster::getDataSegment)
+                      .collect(ImmutableList.toImmutableList())
+    );
+  }
+
+
+  @Test
+  public void test_getRulesForAllDatasources() throws Exception
+  {
+    final Map<String, List<Rule>> rules = ImmutableMap.of(
+        "xyz", List.of(
+            new IntervalLoadRule(
+                Intervals.of("2025-01-01/2025-02-01"),
+                ImmutableMap.of(DruidServer.DEFAULT_TIER, 
DruidServer.DEFAULT_NUM_REPLICANTS),
+                null
+            ),
+            new IntervalLoadRule(
+                Intervals.of("2025-02-01/2025-03-01"),
+                ImmutableMap.of(DruidServer.DEFAULT_TIER, 
DruidServer.DEFAULT_NUM_REPLICANTS),
+                null
+            )
+        )
+    );
+
+    serviceClient.expectAndRespond(
+        new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/rules"),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(rules)
+    );
+
+    Assert.assertEquals(
+        rules,
+        coordinatorClient.getRulesForAllDatasources().get()
+    );
+  }
+
+  @Test
+  public void test_getRules_ForAllDatasources_HttpException_throwsError()
+  {
+    serviceClient.expectAndThrow(
+        new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/rules"),
+        new HttpResponseException(
+            new StringFullResponseHolder(
+                new DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.NOT_FOUND),
+                StandardCharsets.UTF_8
+            )
+        )
+    );
+
+    RuntimeException thrown = Assert.assertThrows(
+        RuntimeException.class,
+        () -> 
FutureUtils.getUnchecked(coordinatorClient.getRulesForAllDatasources(), true)
+    );
+    Assert.assertTrue(Throwables.getRootCause(thrown) instanceof 
HttpResponseException);
+  }
+
+  @Test
+  public void test_findCurrentLeader() throws Exception
+  {
+    String leaderUrl = "http://localhost:8081";;
+    serviceClient.expectAndRespond(
+        new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/leader"),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN),
+        StringUtils.toUtf8(leaderUrl)
+    );
+
+    Assert.assertEquals(
+        leaderUrl,
+        FutureUtils.getUnchecked(coordinatorClient.findCurrentLeader(), 
true).toString()
+    );
+  }
+
+  @Test
+  public void test_findCurrentLeader_invalidUrl()
+  {
+    String invalidLeaderUrl = "{{1234invalidUrl";
+    serviceClient.expectAndRespond(
+        new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/leader"),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN),
+        StringUtils.toUtf8(invalidLeaderUrl)
+    );
+    Assert.assertThrows(
+        RuntimeException.class,
+        () -> FutureUtils.getUnchecked(coordinatorClient.findCurrentLeader(), 
true)
+    );
+  }
+
+  @Test
+  public void test_findCurrentLeader_runtimeException()
+  {
+    serviceClient.expectAndThrow(
+        new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/leader"),
+        new RuntimeException("Simulated runtime error")
+    );
+
+    Assert.assertThrows(
+        RuntimeException.class,
+        () -> FutureUtils.getUnchecked(coordinatorClient.findCurrentLeader(), 
true)
+    );
+  }
+
+  @Test
+  public void test_findCurrentLeader_httpResponseException()
+  {
+    serviceClient.expectAndThrow(
+        new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/leader"),
+        new HttpResponseException(
+            new StringFullResponseHolder(
+                new DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.NOT_FOUND),
+                StandardCharsets.UTF_8
+            )
+        )
+    );
+    // try and assert that the root cause is an HttpResponseException
+    try {
+      FutureUtils.getUnchecked(coordinatorClient.findCurrentLeader(), true);
+    }
+    catch (Exception e) {
+      Throwable throwable = Throwables.getRootCause(e);
+      Assert.assertTrue(throwable instanceof HttpResponseException);
+    }
+  }
+
+  @Test
+  public void test_postLoadRules() throws Exception

Review Comment:
   ```suggestion
     public void test_updateRulesForDatasource() throws Exception
   ```



##########
server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java:
##########
@@ -531,4 +553,300 @@ public void 
test_fetchLookupsForTierSync_detailedEnabled() throws Exception
         coordinatorClient.fetchLookupsForTierSync("default_tier")
     );
   }
+
+  @Test
+  public void test_getAllUsedSegments() throws JsonProcessingException
+  {
+    final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, 
SEGMENT3);
+
+    serviceClient.expectAndRespond(
+        new RequestBuilder(
+            HttpMethod.GET,
+            
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments"
+        ),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(segments)
+    );
+
+    CloseableIterator<SegmentStatusInCluster> iterator = 
FutureUtils.getUnchecked(
+        coordinatorClient.getAllUsedSegments(null, true, true),
+        true
+    );
+    List<SegmentStatusInCluster> actualSegments = new ArrayList<>();
+    while (iterator.hasNext()) {
+      actualSegments.add(iterator.next());
+    }
+    Assert.assertEquals(
+        segments,
+        actualSegments.stream()
+                      .map(SegmentStatusInCluster::getDataSegment)
+                      .collect(ImmutableList.toImmutableList())
+    );
+  }
+
+  @Test
+  public void test_getAllUsedSegments_noParams() throws JsonProcessingException
+  {
+    final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, 
SEGMENT3);
+
+    serviceClient.expectAndRespond(
+        new RequestBuilder(
+            HttpMethod.GET,
+            "/druid/coordinator/v1/metadata/segments"
+        ),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(segments)
+    );
+
+    CloseableIterator<SegmentStatusInCluster> iterator = 
FutureUtils.getUnchecked(
+        coordinatorClient.getAllUsedSegments(null, false, false),
+        true
+    );
+    List<SegmentStatusInCluster> actualSegments = new ArrayList<>();
+    while (iterator.hasNext()) {
+      actualSegments.add(iterator.next());
+    }
+    Assert.assertEquals(
+        segments,
+        actualSegments.stream()
+                      .map(SegmentStatusInCluster::getDataSegment)
+                      .collect(ImmutableList.toImmutableList())
+    );
+  }
+
+  @Test
+  public void test_getAllUsedSegments_excludeRealtimeSegments() throws 
JsonProcessingException
+  {
+    final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, 
SEGMENT3);
+
+    serviceClient.expectAndRespond(
+        new RequestBuilder(
+            HttpMethod.GET,
+            "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus"
+        ),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(segments)
+    );
+
+    CloseableIterator<SegmentStatusInCluster> iterator = 
FutureUtils.getUnchecked(
+        coordinatorClient.getAllUsedSegments(null, true, false),
+        true
+    );
+    List<SegmentStatusInCluster> actualSegments = new ArrayList<>();
+    while (iterator.hasNext()) {
+      actualSegments.add(iterator.next());
+    }
+    Assert.assertEquals(
+        segments,
+        actualSegments.stream()
+                      .map(SegmentStatusInCluster::getDataSegment)
+                      .collect(ImmutableList.toImmutableList())
+    );
+  }
+
+  @Test
+  public void test_getAllUsedSegments_filterByDataSource() throws Exception
+  {
+    serviceClient.expectAndRespond(
+        new RequestBuilder(
+            HttpMethod.GET,
+            
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments&datasources=abc"
+        ),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(ImmutableList.of(SEGMENT3))
+    );
+
+    CloseableIterator<SegmentStatusInCluster> iterator = 
FutureUtils.getUnchecked(
+        coordinatorClient.getAllUsedSegments(Collections.singleton("abc"), 
true, true),
+        true
+    );
+
+    List<SegmentStatusInCluster> actualSegments = new ArrayList<>();
+    while (iterator.hasNext()) {
+      actualSegments.add(iterator.next());
+    }
+    Assert.assertEquals(
+        ImmutableList.of(SEGMENT3),
+        actualSegments.stream()
+                      .map(SegmentStatusInCluster::getDataSegment)
+                      .collect(ImmutableList.toImmutableList())
+    );
+  }
+
+  @Test
+  public void test_getAllUsedSegments_filterByDataSourceOnly() throws Exception
+  {
+    serviceClient.expectAndRespond(
+        new RequestBuilder(
+            HttpMethod.GET,
+            "/druid/coordinator/v1/metadata/segments?datasources=abc"
+        ),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(ImmutableList.of(SEGMENT3))
+    );
+
+    CloseableIterator<SegmentStatusInCluster> iterator = 
FutureUtils.getUnchecked(
+        coordinatorClient.getAllUsedSegments(Collections.singleton("abc"), 
false, false),
+        true
+    );
+
+    List<SegmentStatusInCluster> actualSegments = new ArrayList<>();
+    while (iterator.hasNext()) {
+      actualSegments.add(iterator.next());
+    }
+    Assert.assertEquals(
+        ImmutableList.of(SEGMENT3),
+        actualSegments.stream()
+                      .map(SegmentStatusInCluster::getDataSegment)
+                      .collect(ImmutableList.toImmutableList())
+    );
+  }
+
+
+  @Test
+  public void test_getRulesForAllDatasources() throws Exception
+  {
+    final Map<String, List<Rule>> rules = ImmutableMap.of(
+        "xyz", List.of(
+            new IntervalLoadRule(
+                Intervals.of("2025-01-01/2025-02-01"),
+                ImmutableMap.of(DruidServer.DEFAULT_TIER, 
DruidServer.DEFAULT_NUM_REPLICANTS),
+                null
+            ),
+            new IntervalLoadRule(
+                Intervals.of("2025-02-01/2025-03-01"),
+                ImmutableMap.of(DruidServer.DEFAULT_TIER, 
DruidServer.DEFAULT_NUM_REPLICANTS),
+                null
+            )
+        )
+    );
+
+    serviceClient.expectAndRespond(
+        new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/rules"),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(rules)
+    );
+
+    Assert.assertEquals(
+        rules,
+        coordinatorClient.getRulesForAllDatasources().get()
+    );
+  }
+
+  @Test
+  public void test_getRules_ForAllDatasources_HttpException_throwsError()
+  {
+    serviceClient.expectAndThrow(
+        new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/rules"),
+        new HttpResponseException(
+            new StringFullResponseHolder(
+                new DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.NOT_FOUND),

Review Comment:
   I don't think this test case is needed since this API will never throw a 404.



##########
services/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java:
##########
@@ -85,11 +87,35 @@ public void testAddingToRulesListThrowingError()
     rules.get(DATASOURCE1).add(new ForeverDropRule());
   }
 
+  @Test
+  public void testThrowingExceptionOnHTTPException()

Review Comment:
   ```suggestion
     public void test_poll_throwsException_ifCoordinatorApiReturnsNotOk()
   ```



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -287,6 +294,93 @@ public Map<String, LookupExtractorFactoryContainer> 
fetchLookupsForTierSync(Stri
     }
   }
 
+  @Override
+  public ListenableFuture<CloseableIterator<SegmentStatusInCluster>> 
getAllUsedSegments(
+      @Nullable Set<String> watchedDataSources,
+      Boolean includeOvershadowedStatus,
+      Boolean includeRealtimeSegments
+  )
+  {
+    final StringBuilder pathBuilder = new StringBuilder(
+        "/druid/coordinator/v1/metadata/segments");
+
+    List<String> params = new ArrayList<>();
+    if (includeOvershadowedStatus != null && includeOvershadowedStatus) {
+      params.add("includeOvershadowedStatus");
+    }
+    if (includeRealtimeSegments != null && includeRealtimeSegments) {
+      params.add("includeRealtimeSegments");
+    }
+    if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
+      StringBuilder datasourceParam = new StringBuilder("datasources=");
+      for (String dataSource : watchedDataSources) {
+        datasourceParam.append(StringUtils.urlEncode(dataSource));
+      }

Review Comment:
   I don't think this is correct. If we had multiple watched datasources, this 
would give us:
   
   ```
   datasources=ds1ds2ds3
   ```
   when in fact we want
   ```
   datasources=ds1&datasources=ds2&datasources=ds3
   ```



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java:
##########
@@ -119,4 +123,40 @@ public interface CoordinatorClient
    * @param tier The name of the tier for which the lookup configuration is to 
be fetched.
    */
   Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(String 
tier);
+
+  /**
+   * Returns an iterator over the metadata segments in the cluster.
+   * <p>
+   * API: {@code GET /druid/coordinator/v1/metadata/segments}
+   *
+   * @param watchedDataSources Optional datasources to filter the segments by. 
If null or empty, all segments are returned.
+   * @param includeOvershadowedStatus If true, includes the overshadowed 
status of each segment.
+   * @param includeRealtimeSegments If true, includes realtime segments in the 
result.
+   */
+  ListenableFuture<CloseableIterator<SegmentStatusInCluster>> 
getAllUsedSegments(

Review Comment:
   @uds5501 , I didn't realize that there is another method already exposed by 
this interface `fetchUsedSegment`.
   But it is used to fetch the segments of a single datasource.
   
   We should do a couple of things here:
   - Add a short javadoc to this new method to indicate that it can fetch 
segments for multiple datasources in one go.
   - Also, it turns out that `includeOvershadowedStatus` always has to be true. 
If it is not true, the server returns a response of a different type which will 
throw a parsing error on the client. So, let's get rid of the 
`includeOvershadowedStatus` argument here.
   - Rename the method to `fetchAllUsedSegmentsWithOvershadowedStatus` to 
indicate this fact.
   - The `includeRealtimeSegments` argument should be a primitive `boolean`, 
not a boxed `Boolean`.



##########
server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java:
##########
@@ -531,4 +553,300 @@ public void 
test_fetchLookupsForTierSync_detailedEnabled() throws Exception
         coordinatorClient.fetchLookupsForTierSync("default_tier")
     );
   }
+
+  @Test
+  public void test_getAllUsedSegments() throws JsonProcessingException

Review Comment:
   ```suggestion
     public void test_getAllUsedSegments_includeOvershadowedAndRealtime() 
throws JsonProcessingException
   ```



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedCoordinatorClientTest.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.SegmentDescriptor;
+import 
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentStatusInCluster;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class EmbeddedCoordinatorClientTest extends EmbeddedClusterTestBase
+{
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedBroker broker = new EmbeddedBroker();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedHistorical historical = new EmbeddedHistorical();
+
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+    overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+
+    return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+                               .useLatchableEmitter()
+                               .addServer(coordinator)
+                               .addServer(indexer)
+                               .addServer(overlord)
+                               .addServer(historical)
+                               .addServer(broker);
+  }
+
+  @Test
+  public void test_findCurrentLeader()
+  {
+    URI currentLeader = 
cluster.callApi().onLeaderCoordinator(CoordinatorClient::findCurrentLeader);
+    Assertions.assertEquals(8081, currentLeader.getPort());
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_isHandoffComplete()
+  {
+    batchIngest();
+    coordinator.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(1)
+    );
+    final List<DataSegment> segments = new ArrayList<>(
+        
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
 null)
+    );
+    DataSegment firstSegment = segments.get(0);
+    Boolean result = cluster.callApi().onLeaderCoordinator(
+        c -> c.isHandoffComplete(
+            dataSource,
+            new SegmentDescriptor(firstSegment.getInterval(), 
firstSegment.getVersion(), 0)
+        )
+    );
+    Assertions.assertTrue(result);
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_fetchSegment()
+  {
+    batchIngest();
+    final List<DataSegment> segments = new ArrayList<>(
+        
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
 null)
+    );
+    DataSegment firstSegment = segments.get(0);
+    DataSegment result = cluster.callApi().onLeaderCoordinator(
+        c -> c.fetchSegment(
+            dataSource,
+            firstSegment.getId().toString(),
+            true
+        )
+    );
+    Assert.assertEquals(firstSegment, result);
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_fetchServerViewSegments()
+  {
+    batchIngest();
+    coordinator.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(1)
+    );
+
+    final List<DataSegment> segments = new ArrayList<>(
+        
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
 null)
+    );
+    List<Interval> intervals = List.of(segments.get(0).getInterval());
+    Iterable<ImmutableSegmentLoadInfo> segmentLoadInfo = 
cluster.callApi().onLeaderCoordinatorSync(
+        c -> c.fetchServerViewSegments(dataSource, intervals));
+
+    Assertions.assertTrue(segmentLoadInfo.iterator().hasNext());
+    ImmutableSegmentLoadInfo segmentLoad = segmentLoadInfo.iterator().next();
+    Assertions.assertEquals(segments.get(0), segmentLoad.getSegment());
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_fetchUsedSegments()
+  {
+    batchIngest();
+    coordinator.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(1)
+    );
+
+    final List<DataSegment> segments = new ArrayList<>(
+        
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
 null)
+    );
+    List<DataSegment> result = cluster.callApi().onLeaderCoordinator(
+        c -> c.fetchUsedSegments(dataSource, List.of(Intervals.ETERNITY))
+    );
+
+    Assertions.assertEquals(segments.size(), result.size());
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_getAllUsedSegments() throws IOException
+  {
+    batchIngest();
+    coordinator.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(1)
+    );
+
+    try (CloseableIterator<SegmentStatusInCluster> iterator = 
cluster.callApi().onLeaderCoordinator(
+        c -> c.getAllUsedSegments(Set.of(dataSource), true, true))
+    ) {
+      Assertions.assertTrue(iterator.hasNext());
+      SegmentStatusInCluster segmentStatus = iterator.next();
+      Assertions.assertEquals(dataSource, 
segmentStatus.getDataSegment().getDataSource());
+    }
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_loadRules()
+  {
+    Rule broadcastRule = new ForeverBroadcastDistributionRule();
+    cluster.callApi().onLeaderCoordinator(
+        c -> c.updateRulesForDatasource(dataSource, List.of(broadcastRule))
+    );
+    Map<String, List<Rule>> rules = 
cluster.callApi().onLeaderCoordinator(CoordinatorClient::getRulesForAllDatasources);
+    Assertions.assertTrue(!rules.isEmpty());
+    Assertions.assertEquals(List.of(broadcastRule), rules.get(dataSource));
+  }
+
+  private void batchIngest()

Review Comment:
   ```suggestion
     private void runIndexTask()
   ```



##########
services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java:
##########
@@ -180,7 +185,8 @@ public static Map<String, Object> 
deserializeJsonToMap(String payload)
     try {
       return TestHelper.JSON_MAPPER.readValue(
           payload,
-          new TypeReference<>() {}
+          new TypeReference<>()
+          {}

Review Comment:
   😄 
   ```suggestion
             new TypeReference<>() {}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org


Reply via email to