This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ec25d8706cb Address NPE in CloneHistoricals duty (#18274)
ec25d8706cb is described below

commit ec25d8706cb63672c9da8dec255b79cba46288cc
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jul 18 15:00:06 2025 +0530

    Address NPE in CloneHistoricals duty (#18274)
---
 .../embedded/server/HistoricalCloningTest.java     | 148 +++++++++++++++++++++
 .../server/coordinator/duty/CloneHistoricals.java  |  60 ++++++++-
 2 files changed, 201 insertions(+), 7 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
new file mode 100644
index 00000000000..691a8279d44
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class HistoricalCloningTest extends EmbeddedClusterTestBase
+{
+  private final EmbeddedHistorical historical1 = new EmbeddedHistorical();
+  private final EmbeddedHistorical historical2 = new EmbeddedHistorical()
+      .addProperty("druid.plaintextPort", "7083");
+  private final EmbeddedCoordinator coordinator1 = new EmbeddedCoordinator();
+  private final EmbeddedCoordinator coordinator2 = new EmbeddedCoordinator()
+      .addProperty("druid.plaintextPort", "7081");
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+                               .useLatchableEmitter()
+                               .addServer(overlord)
+                               .addServer(coordinator1)
+                               .addServer(coordinator2)
+                               .addServer(new EmbeddedIndexer())
+                               .addServer(historical1)
+                               .addServer(new EmbeddedBroker())
+                               .addServer(new EmbeddedRouter());
+  }
+
+  @Test
+  public void 
test_cloneHistoricals_inTurboMode_duringCoordinatorLeaderSwitch() throws 
Exception
+  {
+    cluster.callApi().onLeaderCoordinator(
+        c -> c.updateRulesForDatasource(
+            dataSource,
+            List.of(new ForeverLoadRule(Map.of("_default_tier", 1), null))
+        )
+    );
+    cluster.callApi().onLeaderCoordinator(
+        c -> c.updateCoordinatorDynamicConfig(
+            CoordinatorDynamicConfig
+                .builder()
+                .withCloneServers(Map.of("localhost:7083", "localhost:8083"))
+                .withTurboLoadingNodes(Set.of("localhost:7083"))
+                .build()
+        )
+    );
+
+    runIngestion();
+
+    // Wait for segments to be loaded on historical1
+    coordinator1.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(10)
+    );
+    coordinator1.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension("server", 
historical1.bindings().selfNode().getHostAndPort())
+                      .hasDimension("description", "LOAD: NORMAL"),
+        agg -> agg.hasSumAtLeast(10)
+    );
+
+    // Switch coordinator leader to force syncer to reset
+    coordinator1.stop();
+
+    // Wait for a few coordinator runs so that the server views are refreshed
+    coordinator2.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("coordinator/time")
+                      .hasDimension("dutyGroup", "HistoricalManagementDuties"),
+        agg -> agg.hasCountAtLeast(2)
+    );
+
+    // Add historical2 to the cluster
+    cluster.addServer(historical2);
+    historical2.start();
+
+    // Wait for the clones to be loaded
+    coordinator2.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/clone/assigned/count")
+                      .hasDimension("server", 
historical2.bindings().selfNode().getHostAndPort()),
+        agg -> agg.hasSumAtLeast(10)
+    );
+    coordinator2.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension("server", 
historical2.bindings().selfNode().getHostAndPort())
+                      .hasDimension("description", "LOAD: TURBO"),
+        agg -> agg.hasSumAtLeast(10)
+    );
+  }
+
+  private void runIngestion()
+  {
+    final String taskId = IdUtils.getRandomId();
+    final Object task = createIndexTaskForInlineData(
+        taskId,
+        StringUtils.replace(Resources.CSV_DATA_10_DAYS, "\n", "\\n")
+    );
+
+    cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
+    cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+  }
+
+  private Object createIndexTaskForInlineData(String taskId, String 
inlineDataCsv)
+  {
+    return EmbeddedClusterApis.createTaskFromPayload(
+        taskId,
+        StringUtils.format(Resources.INDEX_TASK_PAYLOAD_WITH_INLINE_DATA, 
inlineDataCsv, dataSource)
+    );
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
index 70b4e0a950b..fd4c293afc0 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.server.coordinator.duty;
 
+import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.coordinator.CloneStatusManager;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
@@ -34,6 +35,7 @@ import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 
+import javax.annotation.Nullable;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -102,18 +104,15 @@ public class CloneHistoricals implements CoordinatorDuty
       final Set<DataSegment> targetProjectedSegments = 
targetServer.getProjectedSegments();
       // Load any segments missing in the clone target.
       for (DataSegment segment : sourceProjectedSegments) {
-        if (!targetProjectedSegments.contains(segment) && 
loadQueueManager.loadSegment(segment, targetServer, SegmentAction.LOAD)) {
-          stats.add(
-              Stats.Segments.ASSIGNED_TO_CLONE,
-              RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
-              1L
-          );
+        if (!targetProjectedSegments.contains(segment)) {
+          loadSegmentOnTargetServer(segment, targetServer, params);
         }
       }
 
       // Drop any segments missing from the clone source.
       for (DataSegment segment : targetProjectedSegments) {
-        if (!sourceProjectedSegments.contains(segment) && 
loadQueueManager.dropSegment(segment, targetServer)) {
+        if (!sourceProjectedSegments.contains(segment)
+            && loadQueueManager.dropSegment(segment, targetServer)) {
           stats.add(
               Stats.Segments.DROPPED_FROM_CLONE,
               RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
@@ -129,6 +128,53 @@ public class CloneHistoricals implements CoordinatorDuty
     return params;
   }
 
+  private void loadSegmentOnTargetServer(
+      DataSegment segment,
+      ServerHolder targetServer,
+      DruidCoordinatorRuntimeParams params
+  )
+  {
+    final RowKey.Builder rowKey = RowKey
+        .with(Dimension.SERVER, targetServer.getServer().getName())
+        .with(Dimension.DATASOURCE, segment.getDataSource());
+
+    final DataSegment loadableSegment = getLoadableSegment(segment, params);
+    if (loadableSegment == null) {
+      params.getCoordinatorStats().add(
+          Stats.Segments.ASSIGN_SKIPPED,
+          rowKey.and(Dimension.DESCRIPTION, "Segment not found in metadata 
cache"),
+          1L
+      );
+    } else if (loadQueueManager.loadSegment(loadableSegment, targetServer, 
SegmentAction.LOAD)) {
+      params.getCoordinatorStats().add(
+          Stats.Segments.ASSIGNED_TO_CLONE,
+          rowKey.build(),
+          1L
+      );
+    }
+  }
+
+  /**
+   * Returns a DataSegment with the correct value of loadSpec (as obtained from
+   * metadata store). This method may return null if there is no snapshot 
available
+   * for the underlying datasource or if the segment is unused.
+   */
+  @Nullable
+  private DataSegment getLoadableSegment(DataSegment segmentToMove, 
DruidCoordinatorRuntimeParams params)
+  {
+    if (!params.isUsedSegment(segmentToMove)) {
+      return null;
+    }
+
+    ImmutableDruidDataSource datasource = params.getDataSourcesSnapshot()
+                                                
.getDataSource(segmentToMove.getDataSource());
+    if (datasource == null) {
+      return null;
+    }
+
+    return datasource.getSegment(segmentToMove.getId());
+  }
+
   /**
    * Create a status map of cloning progress based on the cloneServers mapping 
and its current load queue.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to