kfaraz commented on code in PR #17899: URL: https://github.com/apache/druid/pull/17899#discussion_r2041632765
########## server/src/main/java/org/apache/druid/server/coordinator/CloneStatusMetrics.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class CloneStatusMetrics Review Comment: Add a short javadoc ```suggestion public class ServerCloneStatus ``` ########## server/src/main/java/org/apache/druid/client/CoordinatorDynamicConfigView.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.google.common.collect.ImmutableSet; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.inject.Inject; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.DruidInternalDynamicConfigResource; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; + +import javax.validation.constraints.NotNull; +import java.util.Map; +import java.util.Set; + +/** + * Broker view of the coordinator dynamic configuration, and its derived values such as target and source clone servers. + * This class is registered as a managed lifecycle to fetch the coordinator dynamic configuration on startup. Further + * updates are handled through {@link DruidInternalDynamicConfigResource}. + */ +public class CoordinatorDynamicConfigView +{ + private static final Logger log = new Logger(CoordinatorDynamicConfigView.class); + private final CoordinatorClient coordinatorClient; + + @GuardedBy("this") + private CoordinatorDynamicConfig config; Review Comment: Let's use an `AtomicReference` and get rid of all the `synchronized` for simplicity. ########## server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java: ########## @@ -158,4 +164,28 @@ public Response getStatusOfDuties() { return Response.ok(new CoordinatorDutyStatus(coordinator.getStatusOfDuties())).build(); } + + @GET + @Path("/brokerConfigurationStatus") + @ResourceFilters(StateResourceFilter.class) + @Produces(MediaType.APPLICATION_JSON) + public Response getBrokerStatus() + { + return Response.ok(coordinatorDynamicConfigSyncer.getInSyncBrokers()).build(); + } + + @GET + @Path("/cloneStatus") + @ResourceFilters(StateResourceFilter.class) + @Produces(MediaType.APPLICATION_JSON) + public Response getCloneStatus(@QueryParam("targetServer") @Nullable String targetServer) + { + if (targetServer != null) { + CloneStatusMetrics statusForServer = cloneStatusManager.getStatusForServer(targetServer); + return Response.ok(ImmutableMap.of(targetServer, statusForServer)).build(); + Review Comment: nit: extra line. ########## server/src/main/java/org/apache/druid/server/coordinator/CloneStatusManager.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.server.coordinator.loading.SegmentAction; +import org.apache.druid.timeline.DataSegment; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class CloneStatusManager +{ + private final AtomicReference<Map<String, CloneStatusMetrics>> cloneStatusSnapshot; Review Comment: Rather than a map, this can be a dedicated `CloneStatus` object. See other comments regarding the payload of the clone status API. ########## server/src/main/java/org/apache/druid/server/coordinator/CloneStatusMetrics.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class CloneStatusMetrics +{ + private final String sourceServer; + private final Status status; + private final long segmentLoadsRemaining; + private final long segmenetsDropsRemaining; + private final long bytesRemaining; + + @JsonCreator + public CloneStatusMetrics( + @JsonProperty("sourceServer") String sourceServer, + @JsonProperty("status") Status status, + @JsonProperty("segmentLoadsRemaining") long segmentLoadsRemaining, + @JsonProperty("segmentDropsRemaining") long segmenetsDropsRemaining, + @JsonProperty("bytesRemaining") long bytesRemaining + ) + { + this.sourceServer = sourceServer; + this.status = status; + this.segmentLoadsRemaining = segmentLoadsRemaining; + this.segmenetsDropsRemaining = segmenetsDropsRemaining; + this.bytesRemaining = bytesRemaining; + } + + @JsonProperty("sourceServer") + public String getSourceServer() + { + return sourceServer; + } + + @JsonProperty("segmentLoadsRemaining") + public long getSegmentLoadsRemaining() + { + return segmentLoadsRemaining; + } + + @JsonProperty("segmentDropsRemaining") + public long getSegmenetsDropsRemaining() + { + return segmenetsDropsRemaining; + } + + @JsonProperty("bytesRemaining") Review Comment: You need not specify the property names explicitly since the getters already follow the naming convention. ########## docs/querying/query-context.md: ########## @@ -66,6 +66,7 @@ See [SQL query context](sql-query-context.md) for other query context parameters |`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:<br />- Log the stack trace of the exception (if any) produced by the query | |`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. | |`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.| +|`cloneQueryMode`|`EXCLUDE`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `EXCLUDE`, `INCLUDE` and `CLONE_PREFERRED`. `EXCLUDE` means that clone Historicals are not queried by the broker. `CLONE_PREFERRED` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones; Historicals which are not involved in the cloning process will still be queried. `INCLUDE` means that broker queries any Historical without regarding clone status. This parameter only affects native queries; MSQ does not query Historicals directly, and Dart will not respect this context parameter.| Review Comment: Couple of suggestions: - Druid typically uses camel-casing for config values. See [enum UsageMode](https://github.com/apache/druid/blob/a85a3e58570d2eec62a5be312fb9e7c86fe37be7/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java#L94) for an example usage - I don't think we want to prefer clones for querying, we will either query them or not, so that the behaviour of queries is more deterministic. Thus, modes should be `excludeClones`, `includeClones` or `onlyClones`. ```suggestion |`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `onlyClones`. `EXCLUDE` means that clone Historicals are not queried by the broker. `CLONE_PREFERRED` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones; Historicals which are not involved in the cloning process will still be queried. `INCLUDE` means that broker queries any Historical without regarding clone status. This parameter only affects native queries; MSQ does not query Historicals directly, and Dart will not respect this context parameter.| ``` ########## server/src/main/java/org/apache/druid/client/CoordinatorDynamicConfigView.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.google.common.collect.ImmutableSet; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.inject.Inject; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.DruidInternalDynamicConfigResource; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; + +import javax.validation.constraints.NotNull; +import java.util.Map; +import java.util.Set; + +/** + * Broker view of the coordinator dynamic configuration, and its derived values such as target and source clone servers. + * This class is registered as a managed lifecycle to fetch the coordinator dynamic configuration on startup. Further + * updates are handled through {@link DruidInternalDynamicConfigResource}. + */ +public class CoordinatorDynamicConfigView +{ + private static final Logger log = new Logger(CoordinatorDynamicConfigView.class); + private final CoordinatorClient coordinatorClient; + + @GuardedBy("this") + private CoordinatorDynamicConfig config; + @GuardedBy("this") + private Set<String> sourceCloneServers; + @GuardedBy("this") + private Set<String> targetCloneServers; Review Comment: These should not be separate fields as they are already present inside the dynamic config and can always be read from there. ########## docs/api-reference/service-status-api.md: ########## @@ -1334,4 +1334,126 @@ Host: http://BROKER_IP:BROKER_PORT #### Sample response -A successful response to this endpoint results in an empty response body. \ No newline at end of file +A successful response to this endpoint results in an empty response body. + +### Get Historical Cloning Status + +Retrieves the current status of Historical cloning. + +#### URL + +`GET` `/druid/coordinator/v1/cloneStatus` + +#### Responses + +<Tabs> + +<TabItem value="56" label="200 SUCCESS"> + + +<br/> + +*Successfully retrieved cloning status* + +</TabItem> +</Tabs> + +#### Sample request + +<Tabs> + +<TabItem value="58" label="cURL"> + + +```shell +curl "http://COORDINATOR_IP:COORDINATOR_PORT/druid/coordinator/v1/cloneStatus" +``` + +</TabItem> +<TabItem value="59" label="HTTP"> + + +```http +GET /druid/coordinator/v1/cloneStatus HTTP/1.1 +Host: http://COORDINATOR_IP:COORDINATOR_PORT +``` + +</TabItem> +</Tabs> + +#### Sample response + +<details> + <summary>View the response</summary> + +```json +{ + "localhost:8083": { + "sourceServer": "localhost:8089", + "status": "LOADING", + "segmentLoadsRemaining": 0, + "segmenetsDropsRemaining": 0, + "bytesRemaining": 0 + } Review Comment: I think it would be better to have the payload look more like this, i.e. an array of status objects, each of which contains a source, target and other info. ```suggestion "cloneStatus": [{ "sourceServer": "localhost:8089", "targetServer": "localhost:8083", "status": "LOADING", "segmentLoadsRemaining": 0, "segmenetsDropsRemaining": 0, "bytesRemaining": 0 }] ``` ########## server/src/main/java/org/apache/druid/server/BrokerQueryResource.java: ########## @@ -89,19 +97,22 @@ public Response getQueryTargets( InputStream in, @QueryParam("pretty") String pretty, @QueryParam("numCandidates") @DefaultValue("-1") int numCandidates, + @QueryParam("cloneQueryMode") @Nullable CloneQueryMode cloneQueryMode, @Context final HttpServletRequest req ) throws IOException { final ResourceIOReaderWriter ioReaderWriter = createResourceIOReaderWriter(req, pretty != null); try { Query<?> query = ioReaderWriter.getRequestMapper().readValue(in, Query.class); ExecutionVertex ev = ExecutionVertex.of(query); + HistoricalFilter historicalFilter = new HistoricalFilter(configView, cloneQueryMode == null ? QueryContexts.DEFAULT_CLONE_QUERY_MODE : cloneQueryMode); Review Comment: Use `QueryContexts.getAsEnum()` here. ########## server/src/main/java/org/apache/druid/server/coordinator/CloneStatusMetrics.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class CloneStatusMetrics +{ + private final String sourceServer; + private final Status status; + private final long segmentLoadsRemaining; + private final long segmenetsDropsRemaining; + private final long bytesRemaining; + + @JsonCreator + public CloneStatusMetrics( + @JsonProperty("sourceServer") String sourceServer, + @JsonProperty("status") Status status, + @JsonProperty("segmentLoadsRemaining") long segmentLoadsRemaining, + @JsonProperty("segmentDropsRemaining") long segmenetsDropsRemaining, + @JsonProperty("bytesRemaining") long bytesRemaining + ) + { + this.sourceServer = sourceServer; + this.status = status; + this.segmentLoadsRemaining = segmentLoadsRemaining; + this.segmenetsDropsRemaining = segmenetsDropsRemaining; + this.bytesRemaining = bytesRemaining; + } + + @JsonProperty("sourceServer") + public String getSourceServer() + { + return sourceServer; + } + + @JsonProperty("segmentLoadsRemaining") + public long getSegmentLoadsRemaining() + { + return segmentLoadsRemaining; + } + + @JsonProperty("segmentDropsRemaining") + public long getSegmenetsDropsRemaining() + { + return segmenetsDropsRemaining; + } + + @JsonProperty("bytesRemaining") + public long getBytesRemaining() + { + return bytesRemaining; + } + + @JsonProperty("status") + public Status getStatus() + { + return status; + } + + public static CloneStatusMetrics unknown(String sourceServer) + { + return new CloneStatusMetrics(sourceServer, Status.TARGET_SERVER_MISSING, 0, 0, 0); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CloneStatusMetrics that = (CloneStatusMetrics) o; + return segmentLoadsRemaining == that.segmentLoadsRemaining + && segmenetsDropsRemaining == that.segmenetsDropsRemaining + && bytesRemaining == that.bytesRemaining + && Objects.equals(sourceServer, that.sourceServer) + && status == that.status; + } + + @Override + public int hashCode() + { + return Objects.hash(sourceServer, status, segmentLoadsRemaining, segmenetsDropsRemaining, bytesRemaining); + } + + @Override + public String toString() + { + return "CloneStatusMetrics{" + + "sourceServer='" + sourceServer + '\'' + + ", status=" + status + + ", segmentLoadsRemaining=" + segmentLoadsRemaining + + ", segmenetsDropsRemaining=" + segmenetsDropsRemaining + + ", bytesRemaining=" + bytesRemaining + + '}'; + } + + public enum Status Review Comment: ```suggestion public enum State ``` ########## server/src/main/java/org/apache/druid/client/CoordinatorDynamicConfigView.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.google.common.collect.ImmutableSet; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.inject.Inject; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.DruidInternalDynamicConfigResource; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; + +import javax.validation.constraints.NotNull; +import java.util.Map; +import java.util.Set; + +/** + * Broker view of the coordinator dynamic configuration, and its derived values such as target and source clone servers. + * This class is registered as a managed lifecycle to fetch the coordinator dynamic configuration on startup. Further + * updates are handled through {@link DruidInternalDynamicConfigResource}. + */ +public class CoordinatorDynamicConfigView +{ + private static final Logger log = new Logger(CoordinatorDynamicConfigView.class); + private final CoordinatorClient coordinatorClient; + + @GuardedBy("this") + private CoordinatorDynamicConfig config; + @GuardedBy("this") + private Set<String> sourceCloneServers; + @GuardedBy("this") + private Set<String> targetCloneServers; + + @Inject + public CoordinatorDynamicConfigView(CoordinatorClient coordinatorClient) + { + this.coordinatorClient = coordinatorClient; + } + + public synchronized CoordinatorDynamicConfig getDynamicConfiguration() + { + return config; + } + + /** + * Update the config view with a new coordinator dynamic config snapshot. Also updates the source and target clone + * servers based on the new dynamic configuration. + */ + public synchronized void setDynamicConfiguration(@NotNull CoordinatorDynamicConfig updatedConfig) Review Comment: ```suggestion public synchronized void setDynamicConfig(@NotNull CoordinatorDynamicConfig updatedConfig) ``` ########## docs/api-reference/service-status-api.md: ########## @@ -1334,4 +1334,126 @@ Host: http://BROKER_IP:BROKER_PORT #### Sample response -A successful response to this endpoint results in an empty response body. \ No newline at end of file +A successful response to this endpoint results in an empty response body. + +### Get Historical Cloning Status Review Comment: This section should be moved under the Coordinator heading. ########## docs/api-reference/service-status-api.md: ########## @@ -1334,4 +1334,126 @@ Host: http://BROKER_IP:BROKER_PORT #### Sample response -A successful response to this endpoint results in an empty response body. \ No newline at end of file +A successful response to this endpoint results in an empty response body. + +### Get Historical Cloning Status + +Retrieves the current status of Historical cloning. Review Comment: ```suggestion Retrieves the current status of Historical cloning from the Coordinator. ``` ########## server/src/main/java/org/apache/druid/server/BrokerQueryResource.java: ########## @@ -89,19 +97,22 @@ public Response getQueryTargets( InputStream in, @QueryParam("pretty") String pretty, @QueryParam("numCandidates") @DefaultValue("-1") int numCandidates, + @QueryParam("cloneQueryMode") @Nullable CloneQueryMode cloneQueryMode, Review Comment: Pass in a String rather than an enum here. ########## server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java: ########## @@ -191,6 +192,30 @@ private Map<Dimension, String> validateDebugDimensions(Map<String, String> debug return validDebugDimensions; } + public CoordinatorDynamicConfig snapshot() Review Comment: Is this meant to create a copy of the config or a snapshot? Either way, the config itself is immutable, we shouldn't need this method. ########## server/src/main/java/org/apache/druid/client/CachingClusteredClient.java: ########## @@ -139,7 +141,8 @@ public CachingClusteredClient( BrokerParallelMergeConfig parallelMergeConfig, @Merging ForkJoinPool pool, QueryScheduler scheduler, - ServiceEmitter emitter + ServiceEmitter emitter, + CoordinatorDynamicConfigView coordinatorDynamicConfigView Review Comment: Nit: Please move this arg to after or just before `TimelineServerView`. ########## server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder; +import org.apache.druid.rpc.FixedServiceLocator; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.rpc.ServiceClient; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.server.coordinator.CoordinatorConfigManager; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.net.URL; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Updates all brokers with the latest coordinator dynamic config. + */ +public class CoordinatorDynamicConfigSyncer +{ + private static final Logger log = new Logger(CoordinatorDynamicConfigSyncer.class); + + private final CoordinatorConfigManager configManager; + private final ObjectMapper jsonMapper; + private final DruidNodeDiscoveryProvider druidNodeDiscovery; + + private final AtomicReference<CoordinatorDynamicConfig> lastKnownConfig = new AtomicReference<>(); + private final ServiceClientFactory clientFactory; + private final ExecutorService exec; + private final Set<String> inSyncBrokers; + + @Inject + public CoordinatorDynamicConfigSyncer( + @EscalatedGlobal final ServiceClientFactory clientFactory, + final CoordinatorConfigManager configManager, + @Json final ObjectMapper jsonMapper, + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider + ) + { + this.clientFactory = clientFactory; + this.configManager = configManager; + this.jsonMapper = jsonMapper; + this.druidNodeDiscovery = druidNodeDiscoveryProvider; + this.exec = Execs.singleThreaded("DynamicConfigSyncer-%d"); + this.inSyncBrokers = ConcurrentHashMap.newKeySet(); + } + + public void broadcastConfigToBrokers() + { + invalidateInSyncBrokersIfNeeded(); + for (ServiceLocation broker : getKnownBrokers()) { + exec.submit(() -> pushConfigToBroker(broker)); + } + } + + public synchronized Set<String> getInSyncBrokers() + { + return Set.copyOf(inSyncBrokers); + } + + private void pushConfigToBroker(ServiceLocation brokerLocation) + { + final ServiceClient brokerClient = clientFactory.makeClient( + NodeRole.BROKER.getJsonName(), + new FixedServiceLocator(brokerLocation), + StandardRetryPolicy.builder().maxAttempts(6).build() + ); + + try { + CoordinatorDynamicConfig currentDynamicConfig = configManager.getCurrentDynamicConfig(); + final RequestBuilder requestBuilder = + new RequestBuilder(HttpMethod.POST, "/druid-internal/v1/dynamicConfiguration/coordinatorDynamicConfig") + .jsonContent(jsonMapper, currentDynamicConfig); + + final BytesFullResponseHolder responseHolder = brokerClient.request(requestBuilder, new BytesFullResponseHandler()); + final HttpResponseStatus status = responseHolder.getStatus(); + if (status.equals(HttpResponseStatus.OK)) { + addToInSyncBrokers(currentDynamicConfig, brokerLocation); + } else { + log.error( + "Received status [%s] while posting dynamic configs to broker[%s]", + status.getCode(), + brokerLocation + ); + } + } + catch (Exception e) { + // Catch and ignore the exception, wait for the next sync. + log.error( + e, + "Exception while syncing dynamic configuration to broker[%s]", + brokerLocation + ); + } + } + + private Set<ServiceLocation> getKnownBrokers() + { + return druidNodeDiscovery.getForNodeRole(NodeRole.BROKER) + .getAllNodes() + .stream() + .map(DiscoveryDruidNode::toServiceLocation) + .collect(Collectors.toSet()); + } + + private synchronized void invalidateInSyncBrokersIfNeeded() + { + final CoordinatorDynamicConfig currentDynamicConfig = configManager.getCurrentDynamicConfig(); + if (!currentDynamicConfig.equals(lastKnownConfig.get())) { + // Config has changed, clear the inSync list. + inSyncBrokers.clear(); + lastKnownConfig.set(currentDynamicConfig); + } + } + + private synchronized void addToInSyncBrokers(CoordinatorDynamicConfig config, ServiceLocation broker) Review Comment: ```suggestion private synchronized void markBrokerAsSynced(CoordinatorDynamicConfig config, ServiceLocation broker) ``` ########## server/src/main/java/org/apache/druid/server/coordinator/duty/BrokerDynamicConfigSync.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.http.CoordinatorDynamicConfigSyncer; + +/** + * Duty to periodically broadcast the coordinator dynamic configuration to all brokers. + */ +public class BrokerDynamicConfigSync implements CoordinatorDuty Review Comment: ```suggestion public class SendDynamicConfigToBrokers implements CoordinatorDuty ``` ########## server/src/test/java/org/apache/druid/client/TestCoordinatorDynamicConfigView.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import java.util.Set; + +public class TestCoordinatorDynamicConfigView extends CoordinatorDynamicConfigView Review Comment: Do not have this test implementation since the super class is fairly simple itself. Use a test coordinator client instead. ########## server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder; +import org.apache.druid.rpc.FixedServiceLocator; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.rpc.ServiceClient; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.server.coordinator.CoordinatorConfigManager; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.net.URL; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Updates all brokers with the latest coordinator dynamic config. + */ +public class CoordinatorDynamicConfigSyncer +{ + private static final Logger log = new Logger(CoordinatorDynamicConfigSyncer.class); + + private final CoordinatorConfigManager configManager; + private final ObjectMapper jsonMapper; + private final DruidNodeDiscoveryProvider druidNodeDiscovery; + + private final AtomicReference<CoordinatorDynamicConfig> lastKnownConfig = new AtomicReference<>(); + private final ServiceClientFactory clientFactory; + private final ExecutorService exec; + private final Set<String> inSyncBrokers; + + @Inject + public CoordinatorDynamicConfigSyncer( + @EscalatedGlobal final ServiceClientFactory clientFactory, + final CoordinatorConfigManager configManager, + @Json final ObjectMapper jsonMapper, + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider + ) + { + this.clientFactory = clientFactory; + this.configManager = configManager; + this.jsonMapper = jsonMapper; + this.druidNodeDiscovery = druidNodeDiscoveryProvider; + this.exec = Execs.singleThreaded("DynamicConfigSyncer-%d"); + this.inSyncBrokers = ConcurrentHashMap.newKeySet(); + } + + public void broadcastConfigToBrokers() + { + invalidateInSyncBrokersIfNeeded(); + for (ServiceLocation broker : getKnownBrokers()) { + exec.submit(() -> pushConfigToBroker(broker)); Review Comment: I don't think there is much point in submitting a separate runnable for each broker, since the underlying exec is single threaded. Might as well just do everything in one task. ########## server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder; +import org.apache.druid.rpc.FixedServiceLocator; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.rpc.ServiceClient; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.server.coordinator.CoordinatorConfigManager; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.net.URL; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Updates all brokers with the latest coordinator dynamic config. + */ +public class CoordinatorDynamicConfigSyncer +{ + private static final Logger log = new Logger(CoordinatorDynamicConfigSyncer.class); + + private final CoordinatorConfigManager configManager; + private final ObjectMapper jsonMapper; + private final DruidNodeDiscoveryProvider druidNodeDiscovery; + + private final AtomicReference<CoordinatorDynamicConfig> lastKnownConfig = new AtomicReference<>(); + private final ServiceClientFactory clientFactory; + private final ExecutorService exec; + private final Set<String> inSyncBrokers; + + @Inject + public CoordinatorDynamicConfigSyncer( + @EscalatedGlobal final ServiceClientFactory clientFactory, + final CoordinatorConfigManager configManager, + @Json final ObjectMapper jsonMapper, + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider + ) + { + this.clientFactory = clientFactory; + this.configManager = configManager; + this.jsonMapper = jsonMapper; + this.druidNodeDiscovery = druidNodeDiscoveryProvider; + this.exec = Execs.singleThreaded("DynamicConfigSyncer-%d"); + this.inSyncBrokers = ConcurrentHashMap.newKeySet(); + } + + public void broadcastConfigToBrokers() + { + invalidateInSyncBrokersIfNeeded(); + for (ServiceLocation broker : getKnownBrokers()) { + exec.submit(() -> pushConfigToBroker(broker)); + } + } + + public synchronized Set<String> getInSyncBrokers() + { + return Set.copyOf(inSyncBrokers); + } + + private void pushConfigToBroker(ServiceLocation brokerLocation) + { + final ServiceClient brokerClient = clientFactory.makeClient( + NodeRole.BROKER.getJsonName(), + new FixedServiceLocator(brokerLocation), + StandardRetryPolicy.builder().maxAttempts(6).build() + ); + + try { + CoordinatorDynamicConfig currentDynamicConfig = configManager.getCurrentDynamicConfig(); + final RequestBuilder requestBuilder = + new RequestBuilder(HttpMethod.POST, "/druid-internal/v1/dynamicConfiguration/coordinatorDynamicConfig") + .jsonContent(jsonMapper, currentDynamicConfig); + + final BytesFullResponseHolder responseHolder = brokerClient.request(requestBuilder, new BytesFullResponseHandler()); + final HttpResponseStatus status = responseHolder.getStatus(); + if (status.equals(HttpResponseStatus.OK)) { + addToInSyncBrokers(currentDynamicConfig, brokerLocation); + } else { + log.error( + "Received status [%s] while posting dynamic configs to broker[%s]", + status.getCode(), + brokerLocation + ); + } + } + catch (Exception e) { + // Catch and ignore the exception, wait for the next sync. + log.error( + e, + "Exception while syncing dynamic configuration to broker[%s]", + brokerLocation + ); + } + } + + private Set<ServiceLocation> getKnownBrokers() + { + return druidNodeDiscovery.getForNodeRole(NodeRole.BROKER) + .getAllNodes() + .stream() + .map(DiscoveryDruidNode::toServiceLocation) Review Comment: Nit: How does using a `ServiceLocation` help? Is it so that we can use the `toURL()` method? ########## server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java: ########## @@ -43,11 +45,15 @@ public class CoordinatorResource { private final DruidCoordinator coordinator; + private final CloneStatusManager cloneStatusManager; + private final CoordinatorDynamicConfigSyncer coordinatorDynamicConfigSyncer; @Inject - public CoordinatorResource(DruidCoordinator coordinator) + public CoordinatorResource(DruidCoordinator coordinator, CloneStatusManager cloneStatusManager, CoordinatorDynamicConfigSyncer coordinatorDynamicConfigSyncer) Review Comment: Please put arguments in separate lines. ########## server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java: ########## @@ -200,6 +201,21 @@ public <T extends DruidService> T getService(String key, Class<T> clazz) return null; } + public ServiceLocation toServiceLocation() Review Comment: I don't think this is a common use case (yet). This method should live in the syncer class where this is actually used. ########## server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder; +import org.apache.druid.rpc.FixedServiceLocator; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.rpc.ServiceClient; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.server.coordinator.CoordinatorConfigManager; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.net.URL; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Updates all brokers with the latest coordinator dynamic config. + */ +public class CoordinatorDynamicConfigSyncer Review Comment: It feels redundant to have both a duty and a dedicated syncer class with its own `exec`. I would have preferred having the duty, but since we need to trigger the sync whenever the dynamic config is updated, I think let's just have the syncer. When coordinator becomes leader, schedule a period sync on the `exec`. The periodic sync task just goes through each broker one by one. When it is not leader anymore, stop the sync. Emit metrics for: - sync time of each broker - total sync time - number of failed syncs ########## server/src/main/java/org/apache/druid/client/CoordinatorDynamicConfigView.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.google.common.collect.ImmutableSet; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.inject.Inject; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.DruidInternalDynamicConfigResource; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; + +import javax.validation.constraints.NotNull; +import java.util.Map; +import java.util.Set; + +/** + * Broker view of the coordinator dynamic configuration, and its derived values such as target and source clone servers. + * This class is registered as a managed lifecycle to fetch the coordinator dynamic configuration on startup. Further + * updates are handled through {@link DruidInternalDynamicConfigResource}. + */ +public class CoordinatorDynamicConfigView Review Comment: ```suggestion public class BrokerViewOfCoordinatorConfig ``` ########## server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder; +import org.apache.druid.rpc.FixedServiceLocator; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.rpc.ServiceClient; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.server.coordinator.CoordinatorConfigManager; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.net.URL; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Updates all brokers with the latest coordinator dynamic config. + */ +public class CoordinatorDynamicConfigSyncer +{ + private static final Logger log = new Logger(CoordinatorDynamicConfigSyncer.class); + + private final CoordinatorConfigManager configManager; + private final ObjectMapper jsonMapper; + private final DruidNodeDiscoveryProvider druidNodeDiscovery; + + private final AtomicReference<CoordinatorDynamicConfig> lastKnownConfig = new AtomicReference<>(); + private final ServiceClientFactory clientFactory; + private final ExecutorService exec; + private final Set<String> inSyncBrokers; + + @Inject + public CoordinatorDynamicConfigSyncer( + @EscalatedGlobal final ServiceClientFactory clientFactory, Review Comment: We should try to use the `BrokerClient` instead. We might need to add a new method there. ########## server/src/main/java/org/apache/druid/server/ClientInfoResource.java: ########## @@ -86,14 +89,16 @@ public class ClientInfoResource private SegmentMetadataQueryConfig segmentMetadataQueryConfig; private final AuthConfig authConfig; private final AuthorizerMapper authorizerMapper; + private final CoordinatorDynamicConfigView configView; @Inject public ClientInfoResource( FilteredServerInventoryView serverInventoryView, TimelineServerView timelineServerView, SegmentMetadataQueryConfig segmentMetadataQueryConfig, AuthConfig authConfig, - AuthorizerMapper authorizerMapper + AuthorizerMapper authorizerMapper, + CoordinatorDynamicConfigView configView Review Comment: Move this to after `TimelineServerView` arg. ########## server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java: ########## @@ -559,7 +567,8 @@ private List<CoordinatorDuty> makeHistoricalManagementDuties() new MarkOvershadowedSegmentsAsUnused(deleteSegments), new MarkEternityTombstonesAsUnused(deleteSegments), new BalanceSegments(config.getCoordinatorPeriod()), - new CloneHistoricals(loadQueueManager), + new CloneHistoricals(loadQueueManager, cloneStatusManager), + new BrokerDynamicConfigSync(coordinatorDynamicConfigSyncer), Review Comment: Put this duty in a separate duty group to avoid affecting the other critical historical management duties. Or better yet, let's just get rid of the duty. See the other comments. ########## server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java: ########## @@ -607,7 +607,7 @@ private <T> QueryRunner<T> decorateClusterRunner(Query<T> query, QueryRunner<T> * @param parentQueryResourceId Parent Query's Query Resource ID * @return DataSource populated with the subqueries */ - private DataSource generateSubqueryIds( + public static DataSource generateSubqueryIds( Review Comment: Why public? ########## server/src/main/java/org/apache/druid/server/coordinator/CloneStatusManager.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.server.coordinator.loading.SegmentAction; +import org.apache.druid.timeline.DataSegment; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class CloneStatusManager +{ + private final AtomicReference<Map<String, CloneStatusMetrics>> cloneStatusSnapshot; + + public CloneStatusManager() + { + this.cloneStatusSnapshot = new AtomicReference<>(ImmutableMap.of()); Review Comment: ```suggestion this.cloneStatusSnapshot = new AtomicReference<>(Map.of()); ``` ########## server/src/main/java/org/apache/druid/server/DruidInternalDynamicConfigResource.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.google.inject.Inject; +import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.druid.client.CoordinatorDynamicConfigView; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.http.security.ConfigResourceFilter; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +@Path("/druid-internal/v1/dynamicConfiguration") +public class DruidInternalDynamicConfigResource +{ + private final CoordinatorDynamicConfigView coordinatorDynamicConfigView; + + @Inject + public DruidInternalDynamicConfigResource(CoordinatorDynamicConfigView coordinatorDynamicConfigView) + { + this.coordinatorDynamicConfigView = coordinatorDynamicConfigView; + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(ConfigResourceFilter.class) + @Path("/coordinatorDynamicConfig") + public Response getDynamicConfig() + { + return Response.ok(coordinatorDynamicConfigView.getDynamicConfiguration()).build(); + } + + @POST + @Consumes(MediaType.APPLICATION_JSON) + @ResourceFilters(ConfigResourceFilter.class) + @Path("/coordinatorDynamicConfig") + public Response setDynamicConfig(final CoordinatorDynamicConfig dynamicConfig) + { + coordinatorDynamicConfigView.setDynamicConfiguration(dynamicConfig); + return Response.ok("OK").build(); Review Comment: Nicer to include a proper JSON response, even if it is just `{"success": true}`. ```suggestion return Response.ok("OK").build(); ``` ########## server/src/main/java/org/apache/druid/server/ClientInfoResource.java: ########## @@ -300,6 +306,7 @@ public Iterable<LocatedSegmentDescriptor> getQueryTargets( @PathParam("dataSourceName") String datasource, @QueryParam("intervals") String intervals, @QueryParam("numCandidates") @DefaultValue("-1") int numCandidates, + @QueryParam("cloneQueryMode") CloneQueryMode cloneQueryMode, Review Comment: Use String param and `QueryContexts.getAsEnum()`, same as suggested in the other resource class. ########## server/src/main/java/org/apache/druid/server/DruidInternalDynamicConfigResource.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.google.inject.Inject; +import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.druid.client.CoordinatorDynamicConfigView; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.http.security.ConfigResourceFilter; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +@Path("/druid-internal/v1/dynamicConfiguration") Review Comment: ```suggestion @Path("/druid-internal/v1/config") ``` ########## server/src/main/java/org/apache/druid/server/DruidInternalDynamicConfigResource.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.google.inject.Inject; +import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.druid.client.CoordinatorDynamicConfigView; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.http.security.ConfigResourceFilter; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +@Path("/druid-internal/v1/dynamicConfiguration") +public class DruidInternalDynamicConfigResource +{ + private final CoordinatorDynamicConfigView coordinatorDynamicConfigView; + + @Inject + public DruidInternalDynamicConfigResource(CoordinatorDynamicConfigView coordinatorDynamicConfigView) + { + this.coordinatorDynamicConfigView = coordinatorDynamicConfigView; + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(ConfigResourceFilter.class) + @Path("/coordinatorDynamicConfig") + public Response getDynamicConfig() + { + return Response.ok(coordinatorDynamicConfigView.getDynamicConfiguration()).build(); + } + + @POST + @Consumes(MediaType.APPLICATION_JSON) + @ResourceFilters(ConfigResourceFilter.class) + @Path("/coordinatorDynamicConfig") Review Comment: ```suggestion @Path("/coordinator") ``` ########## server/src/main/java/org/apache/druid/server/coordinator/CloneStatusManager.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.server.coordinator.loading.SegmentAction; +import org.apache.druid.timeline.DataSegment; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class CloneStatusManager +{ + private final AtomicReference<Map<String, CloneStatusMetrics>> cloneStatusSnapshot; + + public CloneStatusManager() + { + this.cloneStatusSnapshot = new AtomicReference<>(ImmutableMap.of()); + } + + public Map<String, CloneStatusMetrics> getStatusForAllServers() + { + return cloneStatusSnapshot.get(); + } + + public CloneStatusMetrics getStatusForServer(String targetServer) Review Comment: mark this method as nullable. ########## docs/api-reference/service-status-api.md: ########## @@ -1334,4 +1334,126 @@ Host: http://BROKER_IP:BROKER_PORT #### Sample response -A successful response to this endpoint results in an empty response body. \ No newline at end of file +A successful response to this endpoint results in an empty response body. + +### Get Historical Cloning Status + +Retrieves the current status of Historical cloning. + +#### URL + +`GET` `/druid/coordinator/v1/cloneStatus` + +#### Responses + +<Tabs> + +<TabItem value="56" label="200 SUCCESS"> + + +<br/> + +*Successfully retrieved cloning status* + +</TabItem> +</Tabs> + +#### Sample request + +<Tabs> + +<TabItem value="58" label="cURL"> + + +```shell +curl "http://COORDINATOR_IP:COORDINATOR_PORT/druid/coordinator/v1/cloneStatus" +``` + +</TabItem> +<TabItem value="59" label="HTTP"> + + +```http +GET /druid/coordinator/v1/cloneStatus HTTP/1.1 +Host: http://COORDINATOR_IP:COORDINATOR_PORT +``` + +</TabItem> +</Tabs> + +#### Sample response + +<details> + <summary>View the response</summary> + +```json +{ + "localhost:8083": { + "sourceServer": "localhost:8089", + "status": "LOADING", + "segmentLoadsRemaining": 0, + "segmenetsDropsRemaining": 0, + "bytesRemaining": 0 + } +} +``` + +</details> + +### Get Broker dynamic configuration view + +Retrieves the list of Brokers which have an up-to-date view of Coordinator dynamic configuration. + +#### URL + +`GET` `/druid/coordinator/v1/brokerConfigurationStatus` + +#### Responses + +<Tabs> + +<TabItem value="56" label="200 SUCCESS"> + + +<br/> + +*Successfully retrieved Broker Configuration view* + +</TabItem> +</Tabs> + +#### Sample request + +<Tabs> + +<TabItem value="58" label="cURL"> + + +```shell +curl "http://COORDINATOR_IP:COORDINATOR_PORT/druid/coordinator/v1/brokerConfigurationStatus" +``` + +</TabItem> +<TabItem value="59" label="HTTP"> + + +```http +GET /druid/coordinator/v1/brokerConfigurationStatus HTTP/1.1 +Host: http://COORDINATOR_IP:COORDINATOR_PORT +``` + +</TabItem> +</Tabs> + +#### Sample response + +<details> + <summary>View the response</summary> + +```json +[ + "localhost:8082" +] Review Comment: Include more info in the response, and avoid returning a JSON array. Return a JSON object instead. A JSON object lends itself better to be extended easily in the future. ``` "syncedBrokers": [ {"host": "broker1", "lastSyncTime": 123456789}, {"host": "broker2", "lastSyncTime": 123456789} ] ``` ########## server/src/main/java/org/apache/druid/server/DruidInternalDynamicConfigResource.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.google.inject.Inject; +import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.druid.client.CoordinatorDynamicConfigView; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.http.security.ConfigResourceFilter; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +@Path("/druid-internal/v1/dynamicConfiguration") +public class DruidInternalDynamicConfigResource +{ + private final CoordinatorDynamicConfigView coordinatorDynamicConfigView; + + @Inject + public DruidInternalDynamicConfigResource(CoordinatorDynamicConfigView coordinatorDynamicConfigView) + { + this.coordinatorDynamicConfigView = coordinatorDynamicConfigView; + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(ConfigResourceFilter.class) + @Path("/coordinatorDynamicConfig") Review Comment: ```suggestion @Path("/coordinator") ``` ########## server/src/main/java/org/apache/druid/server/coordinator/CloneStatusManager.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.server.coordinator.loading.SegmentAction; +import org.apache.druid.timeline.DataSegment; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class CloneStatusManager Review Comment: Please add short javadocs for the class and the methods. ########## server/src/main/java/org/apache/druid/server/DruidInternalDynamicConfigResource.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.google.inject.Inject; +import org.apache.druid.client.DynamicConfigurationManager; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +@Path("/druid-internal/v1/dynamicConfiguration") +public class DruidInternalDynamicConfigResource Review Comment: Sure, that's fair. ########## server/src/main/java/org/apache/druid/server/coordinator/CloneStatusManager.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.server.coordinator.loading.SegmentAction; +import org.apache.druid.timeline.DataSegment; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class CloneStatusManager +{ + private final AtomicReference<Map<String, CloneStatusMetrics>> cloneStatusSnapshot; + + public CloneStatusManager() + { + this.cloneStatusSnapshot = new AtomicReference<>(ImmutableMap.of()); + } + + public Map<String, CloneStatusMetrics> getStatusForAllServers() + { + return cloneStatusSnapshot.get(); + } + + public CloneStatusMetrics getStatusForServer(String targetServer) + { + return cloneStatusSnapshot.get().get(targetServer); + } + + public void updateStats(Map<String, ServerHolder> historicalMap, Map<String, String> cloneServers) + { + final Map<String, CloneStatusMetrics> newStatusMap = new HashMap<>(); + + for (Map.Entry<String, String> entry : cloneServers.entrySet()) { + final String targetServerName = entry.getKey(); + final ServerHolder targetServer = historicalMap.get(entry.getKey()); + final String sourceServerName = entry.getValue(); + + long segmentLoad = 0L; + long bytesLeft = 0L; + long segmentDrop = 0L; + + CloneStatusMetrics newStatus; + if (targetServer == null) { + newStatus = CloneStatusMetrics.unknown(sourceServerName); + } else { + + CloneStatusMetrics.Status status; + if (!historicalMap.containsKey(sourceServerName)) { + status = CloneStatusMetrics.Status.SOURCE_SERVER_MISSING; + } else { + status = CloneStatusMetrics.Status.LOADING; + } + + for (Map.Entry<DataSegment, SegmentAction> queuedSegment : targetServer.getQueuedSegments().entrySet()) { + if (queuedSegment.getValue().isLoad()) { + segmentLoad += 1; + bytesLeft += queuedSegment.getKey().getSize(); + } else { + segmentDrop += 1; + } + } + newStatus = new CloneStatusMetrics(sourceServerName, status, segmentLoad, segmentDrop, bytesLeft); + } + newStatusMap.put(targetServerName, newStatus); + } + + cloneStatusSnapshot.set(ImmutableMap.copyOf(newStatusMap)); Review Comment: Let's move all of this logic into the duty and just send the final status objects to this class. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java: ########## @@ -162,7 +163,7 @@ public List<InputSlice> sliceDynamic( */ int findWorkerForServerSelector(final ServerSelector serverSelector, final int maxNumSlices) { - final QueryableDruidServer server = serverSelector.pick(null); + final QueryableDruidServer server = serverSelector.pick(null, HistoricalFilter.IDENTITY_FILTER); Review Comment: +1 ########## server/src/main/java/org/apache/druid/client/selector/HistoricalFilter.java: ########## @@ -0,0 +1,39 @@ +package org.apache.druid.client.selector; + +import com.google.common.collect.ImmutableSet; +import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; +import org.apache.druid.client.QueryableDruidServer; + +import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class HistoricalFilter implements Function<Int2ObjectRBTreeMap<Set<QueryableDruidServer>>, Int2ObjectRBTreeMap<Set<QueryableDruidServer>>> Review Comment: This still needs to be addressed. ########## server/src/main/java/org/apache/druid/server/coordinator/CloneStatusManager.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.server.coordinator.loading.SegmentAction; +import org.apache.druid.timeline.DataSegment; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class CloneStatusManager +{ + private final AtomicReference<Map<String, CloneStatusMetrics>> cloneStatusSnapshot; + + public CloneStatusManager() + { + this.cloneStatusSnapshot = new AtomicReference<>(ImmutableMap.of()); + } + + public Map<String, CloneStatusMetrics> getStatusForAllServers() + { + return cloneStatusSnapshot.get(); + } + + public CloneStatusMetrics getStatusForServer(String targetServer) + { + return cloneStatusSnapshot.get().get(targetServer); + } + + public void updateStats(Map<String, ServerHolder> historicalMap, Map<String, String> cloneServers) Review Comment: ```suggestion public void updateStatus(Map<String, ServerHolder> historicalMap, Map<String, String> cloneServers) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
