This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ec25d8706cb Address NPE in CloneHistoricals duty (#18274)
ec25d8706cb is described below
commit ec25d8706cb63672c9da8dec255b79cba46288cc
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jul 18 15:00:06 2025 +0530
Address NPE in CloneHistoricals duty (#18274)
---
.../embedded/server/HistoricalCloningTest.java | 148 +++++++++++++++++++++
.../server/coordinator/duty/CloneHistoricals.java | 60 ++++++++-
2 files changed, 201 insertions(+), 7 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
new file mode 100644
index 00000000000..691a8279d44
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class HistoricalCloningTest extends EmbeddedClusterTestBase
+{
+ private final EmbeddedHistorical historical1 = new EmbeddedHistorical();
+ private final EmbeddedHistorical historical2 = new EmbeddedHistorical()
+ .addProperty("druid.plaintextPort", "7083");
+ private final EmbeddedCoordinator coordinator1 = new EmbeddedCoordinator();
+ private final EmbeddedCoordinator coordinator2 = new EmbeddedCoordinator()
+ .addProperty("druid.plaintextPort", "7081");
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .addServer(overlord)
+ .addServer(coordinator1)
+ .addServer(coordinator2)
+ .addServer(new EmbeddedIndexer())
+ .addServer(historical1)
+ .addServer(new EmbeddedBroker())
+ .addServer(new EmbeddedRouter());
+ }
+
+ @Test
+ public void
test_cloneHistoricals_inTurboMode_duringCoordinatorLeaderSwitch() throws
Exception
+ {
+ cluster.callApi().onLeaderCoordinator(
+ c -> c.updateRulesForDatasource(
+ dataSource,
+ List.of(new ForeverLoadRule(Map.of("_default_tier", 1), null))
+ )
+ );
+ cluster.callApi().onLeaderCoordinator(
+ c -> c.updateCoordinatorDynamicConfig(
+ CoordinatorDynamicConfig
+ .builder()
+ .withCloneServers(Map.of("localhost:7083", "localhost:8083"))
+ .withTurboLoadingNodes(Set.of("localhost:7083"))
+ .build()
+ )
+ );
+
+ runIngestion();
+
+ // Wait for segments to be loaded on historical1
+ coordinator1.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("segment/loadQueue/success")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+ agg -> agg.hasSumAtLeast(10)
+ );
+ coordinator1.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("segment/loadQueue/success")
+ .hasDimension("server",
historical1.bindings().selfNode().getHostAndPort())
+ .hasDimension("description", "LOAD: NORMAL"),
+ agg -> agg.hasSumAtLeast(10)
+ );
+
+ // Switch coordinator leader to force syncer to reset
+ coordinator1.stop();
+
+ // Wait for a few coordinator runs so that the server views are refreshed
+ coordinator2.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("coordinator/time")
+ .hasDimension("dutyGroup", "HistoricalManagementDuties"),
+ agg -> agg.hasCountAtLeast(2)
+ );
+
+ // Add historical2 to the cluster
+ cluster.addServer(historical2);
+ historical2.start();
+
+ // Wait for the clones to be loaded
+ coordinator2.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("segment/clone/assigned/count")
+ .hasDimension("server",
historical2.bindings().selfNode().getHostAndPort()),
+ agg -> agg.hasSumAtLeast(10)
+ );
+ coordinator2.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("segment/loadQueue/success")
+ .hasDimension("server",
historical2.bindings().selfNode().getHostAndPort())
+ .hasDimension("description", "LOAD: TURBO"),
+ agg -> agg.hasSumAtLeast(10)
+ );
+ }
+
+ private void runIngestion()
+ {
+ final String taskId = IdUtils.getRandomId();
+ final Object task = createIndexTaskForInlineData(
+ taskId,
+ StringUtils.replace(Resources.CSV_DATA_10_DAYS, "\n", "\\n")
+ );
+
+ cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
+ cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+ }
+
+ private Object createIndexTaskForInlineData(String taskId, String
inlineDataCsv)
+ {
+ return EmbeddedClusterApis.createTaskFromPayload(
+ taskId,
+ StringUtils.format(Resources.INDEX_TASK_PAYLOAD_WITH_INLINE_DATA,
inlineDataCsv, dataSource)
+ );
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
index 70b4e0a950b..fd4c293afc0 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
@@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator.duty;
+import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CloneStatusManager;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
@@ -34,6 +35,7 @@ import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
+import javax.annotation.Nullable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -102,18 +104,15 @@ public class CloneHistoricals implements CoordinatorDuty
final Set<DataSegment> targetProjectedSegments =
targetServer.getProjectedSegments();
// Load any segments missing in the clone target.
for (DataSegment segment : sourceProjectedSegments) {
- if (!targetProjectedSegments.contains(segment) &&
loadQueueManager.loadSegment(segment, targetServer, SegmentAction.LOAD)) {
- stats.add(
- Stats.Segments.ASSIGNED_TO_CLONE,
- RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
- 1L
- );
+ if (!targetProjectedSegments.contains(segment)) {
+ loadSegmentOnTargetServer(segment, targetServer, params);
}
}
// Drop any segments missing from the clone source.
for (DataSegment segment : targetProjectedSegments) {
- if (!sourceProjectedSegments.contains(segment) &&
loadQueueManager.dropSegment(segment, targetServer)) {
+ if (!sourceProjectedSegments.contains(segment)
+ && loadQueueManager.dropSegment(segment, targetServer)) {
stats.add(
Stats.Segments.DROPPED_FROM_CLONE,
RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
@@ -129,6 +128,53 @@ public class CloneHistoricals implements CoordinatorDuty
return params;
}
+ private void loadSegmentOnTargetServer(
+ DataSegment segment,
+ ServerHolder targetServer,
+ DruidCoordinatorRuntimeParams params
+ )
+ {
+ final RowKey.Builder rowKey = RowKey
+ .with(Dimension.SERVER, targetServer.getServer().getName())
+ .with(Dimension.DATASOURCE, segment.getDataSource());
+
+ final DataSegment loadableSegment = getLoadableSegment(segment, params);
+ if (loadableSegment == null) {
+ params.getCoordinatorStats().add(
+ Stats.Segments.ASSIGN_SKIPPED,
+ rowKey.and(Dimension.DESCRIPTION, "Segment not found in metadata
cache"),
+ 1L
+ );
+ } else if (loadQueueManager.loadSegment(loadableSegment, targetServer,
SegmentAction.LOAD)) {
+ params.getCoordinatorStats().add(
+ Stats.Segments.ASSIGNED_TO_CLONE,
+ rowKey.build(),
+ 1L
+ );
+ }
+ }
+
+ /**
+ * Returns a DataSegment with the correct value of loadSpec (as obtained from
+ * metadata store). This method may return null if there is no snapshot
available
+ * for the underlying datasource or if the segment is unused.
+ */
+ @Nullable
+ private DataSegment getLoadableSegment(DataSegment segmentToMove,
DruidCoordinatorRuntimeParams params)
+ {
+ if (!params.isUsedSegment(segmentToMove)) {
+ return null;
+ }
+
+ ImmutableDruidDataSource datasource = params.getDataSourcesSnapshot()
+
.getDataSource(segmentToMove.getDataSource());
+ if (datasource == null) {
+ return null;
+ }
+
+ return datasource.getSegment(segmentToMove.getId());
+ }
+
/**
* Create a status map of cloning progress based on the cloneServers mapping
and its current load queue.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]