adarshsanjeev commented on code in PR #17775:
URL: https://github.com/apache/druid/pull/17775#discussion_r2007091471
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -139,17 +146,72 @@ public HttpLoadQueuePeon(
this.callBackExecutor = callBackExecutor;
this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = fetchSegmentLoadingCapabilities();
+ }
+
+ @VisibleForTesting
+ HttpLoadQueuePeon(
+ String baseUrl,
+ ObjectMapper jsonMapper,
+ HttpClient httpClient,
+ HttpLoadQueuePeonConfig config,
+ ScheduledExecutorService processingExecutor,
+ ExecutorService callBackExecutor,
+ Supplier<SegmentLoadingMode> loadingModeSupplier,
+ HistoricalLoadingCapabilities serverCapabilities
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
+ this.httpClient = httpClient;
+ this.config = config;
+ this.processingExecutor = processingExecutor;
+ this.callBackExecutor = callBackExecutor;
+
+ this.serverId = baseUrl;
+ this.loadingModeSupplier = loadingModeSupplier;
+ this.serverCapabilities = serverCapabilities;
+ }
+
+ private HistoricalLoadingCapabilities fetchSegmentLoadingCapabilities()
+ {
try {
- this.changeRequestURL = new URL(
- new URL(baseUrl),
- StringUtils.nonStrictFormat(
- "druid-internal/v1/segments/changeRequests?timeout=%d",
- config.getHostTimeout().getMillis()
- )
+ log.trace("Fetching historical capabilities from Server[%s].", new
URL(serverId));
+ final URL segmentLoadingCapabilitiesURL = new URL(
+ new URL(serverId),
+ "druid-internal/v1/segments/segmentLoadingCapabilities"
+ );
+
+ BytesAccumulatingResponseHandler responseHandler = new
BytesAccumulatingResponseHandler();
+ InputStream stream = httpClient.go(
+ new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+ .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+ responseHandler,
+ new Duration(10000)
+ ).get();
+
+ if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
+ log.info(
+ "Historical capabilities endpoint not found at server[%s]. Using
default values.",
Review Comment:
The server was included, changed this to URL.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]