kfaraz commented on code in PR #18195: URL: https://github.com/apache/druid/pull/18195#discussion_r2194782494
########## server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java: ########## @@ -287,6 +294,93 @@ public Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(Stri } } + @Override + public ListenableFuture<CloseableIterator<SegmentStatusInCluster>> getAllUsedSegments( + @Nullable Set<String> watchedDataSources, + Boolean includeOvershadowedStatus, + Boolean includeRealtimeSegments + ) + { + final StringBuilder pathBuilder = new StringBuilder( + "/druid/coordinator/v1/metadata/segments"); Review Comment: ```suggestion final StringBuilder pathBuilder = new StringBuilder("/druid/coordinator/v1/metadata/segments"); ``` ########## server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java: ########## @@ -287,6 +294,93 @@ public Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(Stri } } + @Override + public ListenableFuture<CloseableIterator<SegmentStatusInCluster>> getAllUsedSegments( + @Nullable Set<String> watchedDataSources, + Boolean includeOvershadowedStatus, + Boolean includeRealtimeSegments + ) + { + final StringBuilder pathBuilder = new StringBuilder( + "/druid/coordinator/v1/metadata/segments"); + + List<String> params = new ArrayList<>(); Review Comment: ```suggestion final List<String> params = new ArrayList<>(); ``` ########## server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java: ########## @@ -531,4 +553,300 @@ public void test_fetchLookupsForTierSync_detailedEnabled() throws Exception coordinatorClient.fetchLookupsForTierSync("default_tier") ); } + + @Test + public void test_getAllUsedSegments() throws JsonProcessingException + { + final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, SEGMENT3); + + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(segments) + ); + + CloseableIterator<SegmentStatusInCluster> iterator = FutureUtils.getUnchecked( + coordinatorClient.getAllUsedSegments(null, true, true), + true + ); + List<SegmentStatusInCluster> actualSegments = new ArrayList<>(); + while (iterator.hasNext()) { + actualSegments.add(iterator.next()); + } + Assert.assertEquals( + segments, + actualSegments.stream() + .map(SegmentStatusInCluster::getDataSegment) + .collect(ImmutableList.toImmutableList()) + ); + } + + @Test + public void test_getAllUsedSegments_noParams() throws JsonProcessingException Review Comment: We should have a test case for multiple datasources as well. ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedCoordinatorClientTest.java: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.server; + +import org.apache.druid.client.ImmutableSegmentLoadInfo; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule; +import org.apache.druid.server.coordinator.rules.Rule; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedClusterApis; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.indexing.Resources; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentStatusInCluster; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class EmbeddedCoordinatorClientTest extends EmbeddedClusterTestBase +{ + private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); + private final EmbeddedBroker broker = new EmbeddedBroker(); + private final EmbeddedIndexer indexer = new EmbeddedIndexer(); + private final EmbeddedOverlord overlord = new EmbeddedOverlord(); + private final EmbeddedHistorical historical = new EmbeddedHistorical(); + + + @Override + protected EmbeddedDruidCluster createCluster() + { + indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s"); + overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s"); Review Comment: This shouldn't be needed either. ########## server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java: ########## @@ -287,6 +294,93 @@ public Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(Stri } } + @Override + public ListenableFuture<CloseableIterator<SegmentStatusInCluster>> getAllUsedSegments( + @Nullable Set<String> watchedDataSources, + Boolean includeOvershadowedStatus, + Boolean includeRealtimeSegments + ) + { + final StringBuilder pathBuilder = new StringBuilder( + "/druid/coordinator/v1/metadata/segments"); + + List<String> params = new ArrayList<>(); + if (includeOvershadowedStatus != null && includeOvershadowedStatus) { Review Comment: Once we start accepting primitive `boolean` instead of the boxed `Boolean` ```suggestion if (includeOvershadowedStatus) { ``` ########## server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java: ########## @@ -531,4 +553,300 @@ public void test_fetchLookupsForTierSync_detailedEnabled() throws Exception coordinatorClient.fetchLookupsForTierSync("default_tier") ); } + + @Test + public void test_getAllUsedSegments() throws JsonProcessingException + { + final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, SEGMENT3); + + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(segments) + ); + + CloseableIterator<SegmentStatusInCluster> iterator = FutureUtils.getUnchecked( + coordinatorClient.getAllUsedSegments(null, true, true), + true + ); + List<SegmentStatusInCluster> actualSegments = new ArrayList<>(); + while (iterator.hasNext()) { + actualSegments.add(iterator.next()); + } + Assert.assertEquals( + segments, + actualSegments.stream() + .map(SegmentStatusInCluster::getDataSegment) + .collect(ImmutableList.toImmutableList()) + ); + } + + @Test + public void test_getAllUsedSegments_noParams() throws JsonProcessingException + { + final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, SEGMENT3); + + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/metadata/segments" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(segments) + ); + + CloseableIterator<SegmentStatusInCluster> iterator = FutureUtils.getUnchecked( + coordinatorClient.getAllUsedSegments(null, false, false), + true + ); + List<SegmentStatusInCluster> actualSegments = new ArrayList<>(); + while (iterator.hasNext()) { + actualSegments.add(iterator.next()); + } + Assert.assertEquals( + segments, + actualSegments.stream() + .map(SegmentStatusInCluster::getDataSegment) + .collect(ImmutableList.toImmutableList()) + ); + } + + @Test + public void test_getAllUsedSegments_excludeRealtimeSegments() throws JsonProcessingException + { + final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, SEGMENT3); + + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(segments) + ); + + CloseableIterator<SegmentStatusInCluster> iterator = FutureUtils.getUnchecked( + coordinatorClient.getAllUsedSegments(null, true, false), + true + ); + List<SegmentStatusInCluster> actualSegments = new ArrayList<>(); + while (iterator.hasNext()) { + actualSegments.add(iterator.next()); + } + Assert.assertEquals( + segments, + actualSegments.stream() + .map(SegmentStatusInCluster::getDataSegment) + .collect(ImmutableList.toImmutableList()) + ); + } + + @Test + public void test_getAllUsedSegments_filterByDataSource() throws Exception + { + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments&datasources=abc" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(ImmutableList.of(SEGMENT3)) + ); + + CloseableIterator<SegmentStatusInCluster> iterator = FutureUtils.getUnchecked( + coordinatorClient.getAllUsedSegments(Collections.singleton("abc"), true, true), + true + ); + + List<SegmentStatusInCluster> actualSegments = new ArrayList<>(); + while (iterator.hasNext()) { + actualSegments.add(iterator.next()); + } + Assert.assertEquals( + ImmutableList.of(SEGMENT3), + actualSegments.stream() + .map(SegmentStatusInCluster::getDataSegment) + .collect(ImmutableList.toImmutableList()) + ); + } + + @Test + public void test_getAllUsedSegments_filterByDataSourceOnly() throws Exception + { + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/metadata/segments?datasources=abc" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(ImmutableList.of(SEGMENT3)) + ); + + CloseableIterator<SegmentStatusInCluster> iterator = FutureUtils.getUnchecked( + coordinatorClient.getAllUsedSegments(Collections.singleton("abc"), false, false), + true + ); + + List<SegmentStatusInCluster> actualSegments = new ArrayList<>(); + while (iterator.hasNext()) { + actualSegments.add(iterator.next()); + } + Assert.assertEquals( + ImmutableList.of(SEGMENT3), + actualSegments.stream() + .map(SegmentStatusInCluster::getDataSegment) + .collect(ImmutableList.toImmutableList()) + ); + } + + + @Test + public void test_getRulesForAllDatasources() throws Exception + { + final Map<String, List<Rule>> rules = ImmutableMap.of( + "xyz", List.of( + new IntervalLoadRule( + Intervals.of("2025-01-01/2025-02-01"), + ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS), + null + ), + new IntervalLoadRule( + Intervals.of("2025-02-01/2025-03-01"), + ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS), + null + ) + ) + ); + + serviceClient.expectAndRespond( + new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/rules"), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(rules) + ); + + Assert.assertEquals( + rules, + coordinatorClient.getRulesForAllDatasources().get() + ); + } + + @Test + public void test_getRules_ForAllDatasources_HttpException_throwsError() + { + serviceClient.expectAndThrow( + new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/rules"), + new HttpResponseException( + new StringFullResponseHolder( + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND), + StandardCharsets.UTF_8 + ) + ) + ); + + RuntimeException thrown = Assert.assertThrows( + RuntimeException.class, + () -> FutureUtils.getUnchecked(coordinatorClient.getRulesForAllDatasources(), true) + ); + Assert.assertTrue(Throwables.getRootCause(thrown) instanceof HttpResponseException); + } + + @Test + public void test_findCurrentLeader() throws Exception + { + String leaderUrl = "http://localhost:8081"; + serviceClient.expectAndRespond( + new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/leader"), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN), + StringUtils.toUtf8(leaderUrl) + ); + + Assert.assertEquals( + leaderUrl, + FutureUtils.getUnchecked(coordinatorClient.findCurrentLeader(), true).toString() + ); + } + + @Test + public void test_findCurrentLeader_invalidUrl() + { + String invalidLeaderUrl = "{{1234invalidUrl"; + serviceClient.expectAndRespond( + new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/leader"), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN), + StringUtils.toUtf8(invalidLeaderUrl) + ); + Assert.assertThrows( + RuntimeException.class, + () -> FutureUtils.getUnchecked(coordinatorClient.findCurrentLeader(), true) + ); + } + + @Test + public void test_findCurrentLeader_runtimeException() + { + serviceClient.expectAndThrow( + new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/leader"), + new RuntimeException("Simulated runtime error") + ); + + Assert.assertThrows( + RuntimeException.class, + () -> FutureUtils.getUnchecked(coordinatorClient.findCurrentLeader(), true) + ); + } + + @Test + public void test_findCurrentLeader_httpResponseException() + { + serviceClient.expectAndThrow( + new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/leader"), + new HttpResponseException( + new StringFullResponseHolder( + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND), + StandardCharsets.UTF_8 + ) + ) + ); + // try and assert that the root cause is an HttpResponseException + try { + FutureUtils.getUnchecked(coordinatorClient.findCurrentLeader(), true); + } + catch (Exception e) { + Throwable throwable = Throwables.getRootCause(e); + Assert.assertTrue(throwable instanceof HttpResponseException); + } + } + + @Test + public void test_postLoadRules() throws Exception Review Comment: ```suggestion public void test_updateRulesForDatasource() throws Exception ``` ########## server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java: ########## @@ -531,4 +553,300 @@ public void test_fetchLookupsForTierSync_detailedEnabled() throws Exception coordinatorClient.fetchLookupsForTierSync("default_tier") ); } + + @Test + public void test_getAllUsedSegments() throws JsonProcessingException + { + final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, SEGMENT3); + + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(segments) + ); + + CloseableIterator<SegmentStatusInCluster> iterator = FutureUtils.getUnchecked( + coordinatorClient.getAllUsedSegments(null, true, true), + true + ); + List<SegmentStatusInCluster> actualSegments = new ArrayList<>(); + while (iterator.hasNext()) { + actualSegments.add(iterator.next()); + } + Assert.assertEquals( + segments, + actualSegments.stream() + .map(SegmentStatusInCluster::getDataSegment) + .collect(ImmutableList.toImmutableList()) + ); + } + + @Test + public void test_getAllUsedSegments_noParams() throws JsonProcessingException + { + final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, SEGMENT3); + + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/metadata/segments" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(segments) + ); + + CloseableIterator<SegmentStatusInCluster> iterator = FutureUtils.getUnchecked( + coordinatorClient.getAllUsedSegments(null, false, false), + true + ); + List<SegmentStatusInCluster> actualSegments = new ArrayList<>(); + while (iterator.hasNext()) { + actualSegments.add(iterator.next()); + } + Assert.assertEquals( + segments, + actualSegments.stream() + .map(SegmentStatusInCluster::getDataSegment) + .collect(ImmutableList.toImmutableList()) + ); + } + + @Test + public void test_getAllUsedSegments_excludeRealtimeSegments() throws JsonProcessingException + { + final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, SEGMENT3); + + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(segments) + ); + + CloseableIterator<SegmentStatusInCluster> iterator = FutureUtils.getUnchecked( + coordinatorClient.getAllUsedSegments(null, true, false), + true + ); + List<SegmentStatusInCluster> actualSegments = new ArrayList<>(); + while (iterator.hasNext()) { + actualSegments.add(iterator.next()); + } + Assert.assertEquals( + segments, + actualSegments.stream() + .map(SegmentStatusInCluster::getDataSegment) + .collect(ImmutableList.toImmutableList()) + ); + } + + @Test + public void test_getAllUsedSegments_filterByDataSource() throws Exception + { + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments&datasources=abc" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(ImmutableList.of(SEGMENT3)) + ); + + CloseableIterator<SegmentStatusInCluster> iterator = FutureUtils.getUnchecked( + coordinatorClient.getAllUsedSegments(Collections.singleton("abc"), true, true), + true + ); + + List<SegmentStatusInCluster> actualSegments = new ArrayList<>(); + while (iterator.hasNext()) { + actualSegments.add(iterator.next()); + } + Assert.assertEquals( + ImmutableList.of(SEGMENT3), + actualSegments.stream() + .map(SegmentStatusInCluster::getDataSegment) + .collect(ImmutableList.toImmutableList()) + ); + } + + @Test + public void test_getAllUsedSegments_filterByDataSourceOnly() throws Exception + { + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/metadata/segments?datasources=abc" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(ImmutableList.of(SEGMENT3)) + ); + + CloseableIterator<SegmentStatusInCluster> iterator = FutureUtils.getUnchecked( + coordinatorClient.getAllUsedSegments(Collections.singleton("abc"), false, false), + true + ); + + List<SegmentStatusInCluster> actualSegments = new ArrayList<>(); + while (iterator.hasNext()) { + actualSegments.add(iterator.next()); + } + Assert.assertEquals( + ImmutableList.of(SEGMENT3), + actualSegments.stream() + .map(SegmentStatusInCluster::getDataSegment) + .collect(ImmutableList.toImmutableList()) + ); + } + + + @Test + public void test_getRulesForAllDatasources() throws Exception + { + final Map<String, List<Rule>> rules = ImmutableMap.of( + "xyz", List.of( + new IntervalLoadRule( + Intervals.of("2025-01-01/2025-02-01"), + ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS), + null + ), + new IntervalLoadRule( + Intervals.of("2025-02-01/2025-03-01"), + ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS), + null + ) + ) + ); + + serviceClient.expectAndRespond( + new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/rules"), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(rules) + ); + + Assert.assertEquals( + rules, + coordinatorClient.getRulesForAllDatasources().get() + ); + } + + @Test + public void test_getRules_ForAllDatasources_HttpException_throwsError() + { + serviceClient.expectAndThrow( + new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/rules"), + new HttpResponseException( + new StringFullResponseHolder( + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND), Review Comment: I don't think this test case is needed since this API will never throw a 404. ########## services/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java: ########## @@ -85,11 +87,35 @@ public void testAddingToRulesListThrowingError() rules.get(DATASOURCE1).add(new ForeverDropRule()); } + @Test + public void testThrowingExceptionOnHTTPException() Review Comment: ```suggestion public void test_poll_throwsException_ifCoordinatorApiReturnsNotOk() ``` ########## server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java: ########## @@ -287,6 +294,93 @@ public Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(Stri } } + @Override + public ListenableFuture<CloseableIterator<SegmentStatusInCluster>> getAllUsedSegments( + @Nullable Set<String> watchedDataSources, + Boolean includeOvershadowedStatus, + Boolean includeRealtimeSegments + ) + { + final StringBuilder pathBuilder = new StringBuilder( + "/druid/coordinator/v1/metadata/segments"); + + List<String> params = new ArrayList<>(); + if (includeOvershadowedStatus != null && includeOvershadowedStatus) { + params.add("includeOvershadowedStatus"); + } + if (includeRealtimeSegments != null && includeRealtimeSegments) { + params.add("includeRealtimeSegments"); + } + if (watchedDataSources != null && !watchedDataSources.isEmpty()) { + StringBuilder datasourceParam = new StringBuilder("datasources="); + for (String dataSource : watchedDataSources) { + datasourceParam.append(StringUtils.urlEncode(dataSource)); + } Review Comment: I don't think this is correct. If we had multiple watched datasources, this would give us: ``` datasources=ds1ds2ds3 ``` when in fact we want ``` datasources=ds1&datasources=ds2&datasources=ds3 ``` ########## server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java: ########## @@ -119,4 +123,40 @@ public interface CoordinatorClient * @param tier The name of the tier for which the lookup configuration is to be fetched. */ Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(String tier); + + /** + * Returns an iterator over the metadata segments in the cluster. + * <p> + * API: {@code GET /druid/coordinator/v1/metadata/segments} + * + * @param watchedDataSources Optional datasources to filter the segments by. If null or empty, all segments are returned. + * @param includeOvershadowedStatus If true, includes the overshadowed status of each segment. + * @param includeRealtimeSegments If true, includes realtime segments in the result. + */ + ListenableFuture<CloseableIterator<SegmentStatusInCluster>> getAllUsedSegments( Review Comment: @uds5501 , I didn't realize that there is another method already exposed by this interface `fetchUsedSegment`. But it is used to fetch the segments of a single datasource. We should do a couple of things here: - Add a short javadoc to this new method to indicate that it can fetch segments for multiple datasources in one go. - Also, it turns out that `includeOvershadowedStatus` always has to be true. If it is not true, the server returns a response of a different type which will throw a parsing error on the client. So, let's get rid of the `includeOvershadowedStatus` argument here. - Rename the method to `fetchAllUsedSegmentsWithOvershadowedStatus` to indicate this fact. - The `includeRealtimeSegments` argument should be a primitive `boolean`, not a boxed `Boolean`. ########## server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java: ########## @@ -531,4 +553,300 @@ public void test_fetchLookupsForTierSync_detailedEnabled() throws Exception coordinatorClient.fetchLookupsForTierSync("default_tier") ); } + + @Test + public void test_getAllUsedSegments() throws JsonProcessingException Review Comment: ```suggestion public void test_getAllUsedSegments_includeOvershadowedAndRealtime() throws JsonProcessingException ``` ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedCoordinatorClientTest.java: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.server; + +import org.apache.druid.client.ImmutableSegmentLoadInfo; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule; +import org.apache.druid.server.coordinator.rules.Rule; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedClusterApis; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.indexing.Resources; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentStatusInCluster; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class EmbeddedCoordinatorClientTest extends EmbeddedClusterTestBase +{ + private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); + private final EmbeddedBroker broker = new EmbeddedBroker(); + private final EmbeddedIndexer indexer = new EmbeddedIndexer(); + private final EmbeddedOverlord overlord = new EmbeddedOverlord(); + private final EmbeddedHistorical historical = new EmbeddedHistorical(); + + + @Override + protected EmbeddedDruidCluster createCluster() + { + indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s"); + overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s"); + + return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper() + .useLatchableEmitter() + .addServer(coordinator) + .addServer(indexer) + .addServer(overlord) + .addServer(historical) + .addServer(broker); + } + + @Test + public void test_findCurrentLeader() + { + URI currentLeader = cluster.callApi().onLeaderCoordinator(CoordinatorClient::findCurrentLeader); + Assertions.assertEquals(8081, currentLeader.getPort()); + } + + @Test + @Timeout(20) + public void test_isHandoffComplete() + { + batchIngest(); + coordinator.latchableEmitter().waitForEventAggregate( + event -> event.hasMetricName("segment/loadQueue/success") + .hasDimension(DruidMetrics.DATASOURCE, dataSource), + agg -> agg.hasSumAtLeast(1) + ); + final List<DataSegment> segments = new ArrayList<>( + overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource, null) + ); + DataSegment firstSegment = segments.get(0); + Boolean result = cluster.callApi().onLeaderCoordinator( + c -> c.isHandoffComplete( + dataSource, + new SegmentDescriptor(firstSegment.getInterval(), firstSegment.getVersion(), 0) + ) + ); + Assertions.assertTrue(result); + } + + @Test + @Timeout(20) + public void test_fetchSegment() + { + batchIngest(); + final List<DataSegment> segments = new ArrayList<>( + overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource, null) + ); + DataSegment firstSegment = segments.get(0); + DataSegment result = cluster.callApi().onLeaderCoordinator( + c -> c.fetchSegment( + dataSource, + firstSegment.getId().toString(), + true + ) + ); + Assert.assertEquals(firstSegment, result); + } + + @Test + @Timeout(20) + public void test_fetchServerViewSegments() + { + batchIngest(); + coordinator.latchableEmitter().waitForEventAggregate( + event -> event.hasMetricName("segment/loadQueue/success") + .hasDimension(DruidMetrics.DATASOURCE, dataSource), + agg -> agg.hasSumAtLeast(1) + ); + + final List<DataSegment> segments = new ArrayList<>( + overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource, null) + ); + List<Interval> intervals = List.of(segments.get(0).getInterval()); + Iterable<ImmutableSegmentLoadInfo> segmentLoadInfo = cluster.callApi().onLeaderCoordinatorSync( + c -> c.fetchServerViewSegments(dataSource, intervals)); + + Assertions.assertTrue(segmentLoadInfo.iterator().hasNext()); + ImmutableSegmentLoadInfo segmentLoad = segmentLoadInfo.iterator().next(); + Assertions.assertEquals(segments.get(0), segmentLoad.getSegment()); + } + + @Test + @Timeout(20) + public void test_fetchUsedSegments() + { + batchIngest(); + coordinator.latchableEmitter().waitForEventAggregate( + event -> event.hasMetricName("segment/loadQueue/success") + .hasDimension(DruidMetrics.DATASOURCE, dataSource), + agg -> agg.hasSumAtLeast(1) + ); + + final List<DataSegment> segments = new ArrayList<>( + overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource, null) + ); + List<DataSegment> result = cluster.callApi().onLeaderCoordinator( + c -> c.fetchUsedSegments(dataSource, List.of(Intervals.ETERNITY)) + ); + + Assertions.assertEquals(segments.size(), result.size()); + } + + @Test + @Timeout(20) + public void test_getAllUsedSegments() throws IOException + { + batchIngest(); + coordinator.latchableEmitter().waitForEventAggregate( + event -> event.hasMetricName("segment/loadQueue/success") + .hasDimension(DruidMetrics.DATASOURCE, dataSource), + agg -> agg.hasSumAtLeast(1) + ); + + try (CloseableIterator<SegmentStatusInCluster> iterator = cluster.callApi().onLeaderCoordinator( + c -> c.getAllUsedSegments(Set.of(dataSource), true, true)) + ) { + Assertions.assertTrue(iterator.hasNext()); + SegmentStatusInCluster segmentStatus = iterator.next(); + Assertions.assertEquals(dataSource, segmentStatus.getDataSegment().getDataSource()); + } + } + + @Test + @Timeout(20) + public void test_loadRules() + { + Rule broadcastRule = new ForeverBroadcastDistributionRule(); + cluster.callApi().onLeaderCoordinator( + c -> c.updateRulesForDatasource(dataSource, List.of(broadcastRule)) + ); + Map<String, List<Rule>> rules = cluster.callApi().onLeaderCoordinator(CoordinatorClient::getRulesForAllDatasources); + Assertions.assertTrue(!rules.isEmpty()); + Assertions.assertEquals(List.of(broadcastRule), rules.get(dataSource)); + } + + private void batchIngest() Review Comment: ```suggestion private void runIndexTask() ``` ########## services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java: ########## @@ -180,7 +185,8 @@ public static Map<String, Object> deserializeJsonToMap(String payload) try { return TestHelper.JSON_MAPPER.readValue( payload, - new TypeReference<>() {} + new TypeReference<>() + {} Review Comment: 😄 ```suggestion new TypeReference<>() {} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org