This is an automated email from the ASF dual-hosted git repository.

Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f231ee02c10 Fix race condition in IS update (#18559)
f231ee02c10 is described below

commit f231ee02c104c3a5a13ed1f53fe9ba2f43bbb8b0
Author: NOOB <[email protected]>
AuthorDate: Tue May 26 02:46:42 2026 +0530

    Fix race condition in IS update (#18559)
---
 .../common/utils/helix/IdealStateGroupCommit.java  |  36 +-
 .../helix/IdealStateGroupCommitTest.java           | 414 +++++++++++++++++++++
 2 files changed, 441 insertions(+), 9 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
index 9d1e1a1330f..4a2d83c65c2 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
@@ -73,6 +73,11 @@ public class IdealStateGroupCommit {
     IdealState _updatedIdealState = null;
     AtomicBoolean _sent = new AtomicBoolean(false);
     Throwable _exception;
+    // Set to true when the entry's owning thread (the leader of a failed 
batch) wants the next
+    // leader to skip this entry instead of applying it. Read in the iteration 
loop. Volatile so
+    // the next leader sees it without synchronization. This is the O(1) 
alternative to calling
+    // ConcurrentLinkedQueue.remove(entry) (which is O(n)) in the catch path.
+    volatile boolean _cancelled = false;
 
     Entry(String resourceName, Function<IdealState, IdealState> updater) {
       _resourceName = resourceName;
@@ -123,7 +128,7 @@ public class IdealStateGroupCommit {
             // All pending entries have been processed, the updatedIdealState 
should be set.
             return entry._updatedIdealState;
           }
-          IdealState response = updateIdealState(helixManager, resourceName, 
idealState -> {
+          updateIdealState(helixManager, resourceName, idealState -> {
             IdealState updatedIdealState = idealState;
             if (!processed.isEmpty()) {
               queue._pending.addAll(processed);
@@ -135,6 +140,15 @@ public class IdealStateGroupCommit {
               if (!ent._resourceName.equals(resourceName)) {
                 continue;
               }
+              if (ent._cancelled) {
+                // Cancelled by a previous failed batch's leader (its commit() 
catch). Skip and
+                // remove so this batch never applies an updater whose owner 
has already given up.
+                // Without this, the owner's catch (e.g. cleanup of a 
newly-created segment ZK
+                // metadata in the pauseless commit path) races against this 
batch's CAS write
+                // and produces an "in IdealState, no ZK metadata" orphan.
+                it.remove();
+                continue;
+              }
               processed.add(ent);
               it.remove();
               updatedIdealState = ent._updater.apply(updatedIdealState);
@@ -143,15 +157,19 @@ public class IdealStateGroupCommit {
             }
             return updatedIdealState;
           }, retryPolicy, noChangeOk);
-          if (response == null) {
-            RuntimeException ex = new RuntimeException("Failed to update 
IdealState");
-            for (Entry ent : processed) {
-              ent._exception = ex;
-              ent._updatedIdealState = null;
-            }
-            throw ex;
-          }
         } catch (Throwable e) {
+          // If this leader's own entry was never iterated by the failed 
batch, it's still
+          // sitting in _pending. Without cancelling it, the NEXT leader will 
iterate it and
+          // apply its updater -- after this thread's caller has already 
concluded the commit
+          // failed and run its cleanup (e.g., cleanup of a newly-created 
segment's ZK metadata
+          // in the pauseless commit path). That race produces an "in 
IdealState, no ZK
+          // metadata" orphan. We set the cancellation flag (O(1)) instead of 
calling
+          // ConcurrentLinkedQueue.remove (O(n)); the next leader's iteration 
check skips and
+          // removes any cancelled entry. If our entry was iterated (already 
in `processed`),
+          // the flag is harmless. We don't add `entry` to `processed` because 
nothing reads
+          // its _exception or waits on its _sent: the owner thread IS this 
thread, and it's
+          // about to exit commit() via the throw below.
+          entry._cancelled = true;
           // If there is an exception, set the exception for all processed 
entries
           for (Entry ent : processed) {
             ent._exception = e;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
index 37728cacf13..7b17f7549fb 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
@@ -18,18 +18,30 @@
  */
 package org.apache.pinot.controller.helix;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.helix.IdealStateGroupCommit;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,6 +102,408 @@ public class IdealStateGroupCommitTest {
     TEST_INSTANCE.cleanup();
   }
 
+  /**
+   * Regression test for the leader-entry cancellation fix in {@link 
IdealStateGroupCommit#commit}.
+   *
+   * Setup mirrors the pre-fix race condition that produced "in IdealState, no 
ZK metadata"
+   * orphans in the pauseless segment commit path:
+   *
+   *   1. The queue already contains an entry whose updater will throw 
PermanentUpdaterException
+   *      (a pauseless segment whose 300s max-completion-time deadline has 
passed while waiting
+   *      behind a long IdealState operation such as a bulk retention 
deletion).
+   *   2. A fresh thread calls commit() with a healthy updater. Its entry is 
enqueued AFTER the
+   *      stuck entry in FIFO order.
+   *   3. The fresh thread becomes leader and iterates the queue in FIFO order.
+   *
+   * Pre-fix behavior (the bug): the stuck entry's updater threw, iteration 
stopped, the fresh
+   * thread's commit() exited with that exception, but its OWN queued entry 
was never iterated and
+   * stayed in _pending. A subsequent leader applied it. The fresh thread's 
caller observed "my
+   * update failed" and ran cleanup (removing the newly-created segment's ZK 
metadata), yet the
+   * subsequent leader's CAS put the segment in IdealState -- producing the 
orphan.
+   *
+   * Post-fix behavior (asserted here): the catch in commit() sets _cancelled 
on the fresh
+   * thread's own entry and adds it to `processed`. The fresh thread's 
commit() still throws
+   * (pre-fix all-or-nothing batch semantics are preserved), but a subsequent 
leader's iteration
+   * sees the cancelled entry, skips it, and removes it. No orphan can form 
because the fresh
+   * updater is never applied to IdealState by any future batch.
+   */
+  @Test
+  public void testFreshUpdaterAppliedAfterCallerThrows()
+      throws Exception {
+    String tableName = TABLE_NAME_PREFIX + "race_OFFLINE";
+    IdealState initialState = new IdealState(tableName);
+    initialState.setStateModelDefRef("OnlineOffline");
+    initialState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    initialState.setReplicas("1");
+    initialState.setNumPartitions(0);
+    
TEST_INSTANCE.getHelixAdmin().addResource(TEST_INSTANCE.getHelixClusterName(), 
tableName, initialState);
+
+    try {
+      IdealStateGroupCommit commit = new IdealStateGroupCommit();
+
+      // Step 1: Pre-populate the queue with a stuck entry whose updater throws
+      // PermanentUpdaterException. Simulates a queued entry for a pauseless 
segment whose 300s
+      // deadline has expired.
+      injectStuckEntry(commit, tableName);
+
+      // Step 2: Fresh thread submits a healthy updater that adds a partition.
+      Function<IdealState, IdealState> freshUpdater = is -> {
+        is.setPartitionState("freshPartition", "instance1", "ONLINE");
+        return is;
+      };
+
+      Throwable freshException = null;
+      try {
+        commit.commit(TEST_INSTANCE.getHelixManager(), tableName, freshUpdater,
+            RetryPolicies.noDelayRetryPolicy(1), false);
+      } catch (Throwable e) {
+        freshException = e;
+      }
+
+      LOGGER.info("Fresh thread's commit() returned with exception: {}",
+          freshException == null ? "(none)"
+              : freshException.getClass().getSimpleName() + ": " + 
freshException.getMessage());
+
+      // The fresh thread's commit() throws (the all-or-nothing batch 
semantics are preserved).
+      Assert.assertNotNull(freshException,
+          "Fresh thread's commit() should throw because the batched IdealState 
commit aborts when "
+              + "the co-batched stuck entry's lambda throws 
PermanentUpdaterException.");
+
+      // Step 3: A drainer commit runs. If the fresh thread's entry was still 
in _pending and
+      // NOT cancelled (the pre-fix bug), the drainer would iterate it and 
write freshPartition
+      // into IdealState -- creating the orphan when paired with a caller that 
has already
+      // cleaned up new segment metadata. With the cancellation fix, the entry 
is skipped.
+      Function<IdealState, IdealState> drainerUpdater = is -> {
+        is.setPartitionState("drainerPartition", "instance1", "ONLINE");
+        return is;
+      };
+      commit.commit(TEST_INSTANCE.getHelixManager(), tableName, drainerUpdater,
+          RetryPolicies.noDelayRetryPolicy(1), false);
+
+      IdealState finalState = 
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), tableName);
+      Map<String, String> freshMap = 
finalState.getInstanceStateMap("freshPartition");
+      Map<String, String> drainerMap = 
finalState.getInstanceStateMap("drainerPartition");
+
+      LOGGER.info("Final IdealState partitions: {}", 
finalState.getPartitionSet());
+      LOGGER.info("freshPartition state map:    {}", freshMap);
+      LOGGER.info("drainerPartition state map:  {}", drainerMap);
+
+      // Sanity: the drainer's own change must be present (proves the drainer 
commit ran).
+      Assert.assertNotNull(drainerMap, "Drainer commit should have written 
drainerPartition.");
+      Assert.assertEquals(drainerMap.get("instance1"), "ONLINE");
+
+      // The key assertion: the fresh updater's change must NOT be in 
IdealState. The cancellation
+      // flag must have caused the drainer's iteration to skip and remove the 
fresh entry. If this
+      // assertion fails, the cancellation fix has regressed and the 
orphan-creation race can
+      // reoccur (caller threw, ran cleanup, but subsequent leader applied the 
update anyway).
+      Assert.assertNull(freshMap,
+          "Fresh updater's change must NOT be in IdealState. The fresh 
thread's commit() threw, "
+              + "indicating to its caller that the update did not happen; a 
subsequent leader "
+              + "must not have applied it. Found: " + freshMap);
+
+      LOGGER.info("=== FIX VERIFIED: caller threw, no future leader applied 
the cancelled entry ===");
+    } finally {
+      
TEST_INSTANCE.getHelixAdmin().dropResource(TEST_INSTANCE.getHelixClusterName(), 
tableName);
+    }
+  }
+
+  /**
+   * Multi-thread consistency test: N stuck owners + M fresh owners race to 
commit on the same
+   * resource. The post-fix contract is per-owner consistency, not per-owner 
success: every owner
+   * either observes a successful commit AND its update is present in 
IdealState, OR observes an
+   * exception AND its update is NOT present in IdealState. The cancellation 
flag must ensure no
+   * orphan can be left behind by any failed leader.
+   */
+  @Test
+  public void testMultipleStuckAndFreshConsistency()
+      throws Exception {
+    String tableName = TABLE_NAME_PREFIX + "multi_OFFLINE";
+    IdealState initialState = new IdealState(tableName);
+    initialState.setStateModelDefRef("OnlineOffline");
+    initialState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    initialState.setReplicas("1");
+    initialState.setNumPartitions(0);
+    
TEST_INSTANCE.getHelixAdmin().addResource(TEST_INSTANCE.getHelixClusterName(), 
tableName, initialState);
+
+    int stuckCount = 3;
+    int freshCount = 5;
+    IdealStateGroupCommit commit = new IdealStateGroupCommit();
+    ExecutorService pool = Executors.newFixedThreadPool(stuckCount + 
freshCount);
+
+    try {
+      CountDownLatch allReady = new CountDownLatch(stuckCount + freshCount);
+      CountDownLatch goSignal = new CountDownLatch(1);
+
+      List<AtomicReference<Throwable>> stuckResults = new ArrayList<>();
+      for (int i = 0; i < stuckCount; i++) {
+        AtomicReference<Throwable> result = new AtomicReference<>();
+        stuckResults.add(result);
+        final int idx = i;
+        pool.submit(() -> {
+          allReady.countDown();
+          try {
+            goSignal.await();
+          } catch (InterruptedException ignored) {
+          }
+          Function<IdealState, IdealState> stuckUpdater = is -> {
+            throw new HelixHelper.PermanentUpdaterException("stuck-" + idx);
+          };
+          try {
+            commit.commit(TEST_INSTANCE.getHelixManager(), tableName, 
stuckUpdater,
+                RetryPolicies.noDelayRetryPolicy(1), false);
+          } catch (Throwable t) {
+            result.set(t);
+          }
+        });
+      }
+
+      List<AtomicReference<Throwable>> freshResults = new ArrayList<>();
+      for (int i = 0; i < freshCount; i++) {
+        AtomicReference<Throwable> result = new AtomicReference<>();
+        freshResults.add(result);
+        final int idx = i;
+        pool.submit(() -> {
+          allReady.countDown();
+          try {
+            goSignal.await();
+          } catch (InterruptedException ignored) {
+          }
+          Function<IdealState, IdealState> freshUpdater = is -> {
+            is.setPartitionState("freshPart-" + idx, "instance1", "ONLINE");
+            return is;
+          };
+          try {
+            commit.commit(TEST_INSTANCE.getHelixManager(), tableName, 
freshUpdater,
+                RetryPolicies.noDelayRetryPolicy(1), false);
+          } catch (Throwable t) {
+            result.set(t);
+          }
+        });
+      }
+
+      Assert.assertTrue(allReady.await(10, TimeUnit.SECONDS), "Worker threads 
failed to start");
+      goSignal.countDown();
+      pool.shutdown();
+      Assert.assertTrue(pool.awaitTermination(30, TimeUnit.SECONDS), "Workers 
did not finish");
+
+      // After the race, a drainer commit ensures any 
successful-but-not-yet-applied entries are
+      // CAS-written, and any cancelled entries are skipped+removed. This 
makes the final
+      // IdealState the ground truth for which fresh updaters actually took 
effect.
+      commit.commit(TEST_INSTANCE.getHelixManager(), tableName, is -> {
+        is.setPartitionState("drainer", "instance1", "ONLINE");
+        return is;
+      }, RetryPolicies.noDelayRetryPolicy(1), false);
+
+      IdealState finalState = 
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), tableName);
+
+      // Every stuck owner must observe a throw. The stuck lambda always throws
+      // PermanentUpdaterException, so the batch they participated in always 
aborts.
+      for (int i = 0; i < stuckCount; i++) {
+        Throwable t = stuckResults.get(i).get();
+        Assert.assertNotNull(t, "Stuck owner " + i + " should have thrown");
+      }
+
+      // Consistency check for every fresh owner: observed outcome must match 
IdealState.
+      int freshSucceeded = 0;
+      int freshThrew = 0;
+      for (int i = 0; i < freshCount; i++) {
+        Throwable t = freshResults.get(i).get();
+        Map<String, String> inIs = finalState.getInstanceStateMap("freshPart-" 
+ i);
+        if (t == null) {
+          freshSucceeded++;
+          Assert.assertNotNull(inIs,
+              "Fresh owner " + i + " observed success but its partition is NOT 
in IdealState. "
+                  + "Owner outcome and IdealState are inconsistent.");
+          Assert.assertEquals(inIs.get("instance1"), "ONLINE");
+        } else {
+          freshThrew++;
+          Assert.assertNull(inIs,
+              "Fresh owner " + i + " observed exception [" + t.getMessage()
+                  + "] but its partition IS in IdealState. This is the 
orphan-creating race: "
+                  + "the cancellation flag must skip a failed leader's entry 
so no subsequent "
+                  + "batch applies it.");
+        }
+      }
+
+      LOGGER.info("=== Consistency verified: {} stuck threw, {} fresh 
succeeded, {} fresh threw ===",
+          stuckCount, freshSucceeded, freshThrew);
+    } finally {
+      if (!pool.isShutdown()) {
+        pool.shutdownNow();
+      }
+      
TEST_INSTANCE.getHelixAdmin().dropResource(TEST_INSTANCE.getHelixClusterName(), 
tableName);
+    }
+  }
+
+  /**
+   * Integration-style test that walks the same chain the orphan-creating 
production bug walked:
+   *
+   *   - Step 2 of commitSegmentMetadataInternal: write the new consuming 
segment's ZK metadata
+   *     to the property store with status IN_PROGRESS.
+   *   - Step 3: update IdealState to mark the old segment ONLINE and add the 
new segment as
+   *     CONSUMING -- via IdealStateGroupCommit.commit().
+   *   - If commit() throws, simulate the catch block in 
commitSegmentMetadataInternal: call
+   *     removeSegmentZKMetadataBestEffort on the new segment.
+   *   - Then let a drainer commit run (mimics any subsequent batch picking up 
the queue).
+   *
+   * Orphan condition: the new segment ends up in IdealState but its ZK 
metadata is gone.
+   * Pre-fix: a co-batched stuck pauseless entry threw, causing this caller's 
commit() to throw
+   * (so cleanup ran) while the caller's own updater stayed in _pending and 
was applied by the
+   * next leader -- producing the orphan.
+   *
+   * The test asserts that no orphan is produced. With the cancellation fix it 
passes -- the
+   * caller still throws (preserving pre-fix all-or-nothing batch semantics), 
but the entry is
+   * cancelled so the drainer never applies it.
+   */
+  @Test
+  public void testNoOrphanWhenCoBatchedEntryThrowsDuringStepThree()
+      throws Exception {
+    String tableName = TABLE_NAME_PREFIX + "orphan_REALTIME";
+    String oldSegment = "orphanTest__0__0__T0000Z";
+    String newSegment = "orphanTest__0__1__T0010Z";
+    String instance = "instance1";
+
+    IdealState initialState = new IdealState(tableName);
+    initialState.setStateModelDefRef("OnlineOffline");
+    initialState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    initialState.setReplicas("1");
+    initialState.setNumPartitions(0);
+    initialState.setPartitionState(oldSegment, instance, "CONSUMING");
+    
TEST_INSTANCE.getHelixAdmin().addResource(TEST_INSTANCE.getHelixClusterName(), 
tableName, initialState);
+
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
TEST_INSTANCE.getPropertyStore();
+
+    try {
+      IdealStateGroupCommit commit = new IdealStateGroupCommit();
+
+      // Step 2 (mimicked): persist new segment's ZK metadata as IN_PROGRESS.
+      SegmentZKMetadata newSegmentMetadata = new SegmentZKMetadata(newSegment);
+      newSegmentMetadata.setStatus(Status.IN_PROGRESS);
+      boolean wrote = ZKMetadataProvider.setSegmentZKMetadata(propertyStore, 
tableName, newSegmentMetadata, -1);
+      Assert.assertTrue(wrote, "Pre-condition: writing new segment ZK metadata 
should succeed");
+
+      // Co-batched in-flight throwing entry: mimics a pauseless segment that 
has timed out
+      // and whose queued updater will throw PermanentUpdaterException when 
iterated.
+      injectStuckEntry(commit, tableName);
+
+      // Step 3 updater: flip old segment to ONLINE, add new segment as 
CONSUMING.
+      Function<IdealState, IdealState> stepThreeUpdater = is -> {
+        is.setPartitionState(oldSegment, instance, "ONLINE");
+        is.setPartitionState(newSegment, instance, "CONSUMING");
+        return is;
+      };
+
+      Throwable callerThrew = null;
+      try {
+        commit.commit(TEST_INSTANCE.getHelixManager(), tableName, 
stepThreeUpdater,
+            RetryPolicies.noDelayRetryPolicy(1), false);
+      } catch (Throwable t) {
+        callerThrew = t;
+        // Mimic commitSegmentMetadataInternal's catch: delete the new 
segment's ZK metadata
+        // best-effort because Step 3 appeared to fail.
+        ZKMetadataProvider.removeSegmentZKMetadata(propertyStore, tableName, 
newSegment);
+        LOGGER.info("Caller observed exception, ran 
removeSegmentZKMetadataBestEffort(newSegment): {}",
+            t.getMessage());
+      }
+
+      // Drainer commit: mimics any subsequent IdealStateGroupCommit batch 
that processes
+      // whatever is left in the queue. If our caller's updater were still 
queued (the pre-fix
+      // bug), the drainer would apply it here -- writing newSegment into 
IdealState even
+      // though we already deleted its ZK metadata. With the cancellation fix, 
the drainer's
+      // iteration skips the cancelled entry.
+      Function<IdealState, IdealState> drainer = is -> {
+        is.setPartitionState("drainerPartition", instance, "ONLINE");
+        return is;
+      };
+      commit.commit(TEST_INSTANCE.getHelixManager(), tableName, drainer,
+          RetryPolicies.noDelayRetryPolicy(1), false);
+
+      // Inspect ground truth.
+      IdealState finalState = 
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), tableName);
+      Map<String, String> newSegInIs = 
finalState.getInstanceStateMap(newSegment);
+      SegmentZKMetadata newSegMetadataAfter = 
ZKMetadataProvider.getSegmentZKMetadata(
+          propertyStore, tableName, newSegment);
+
+      boolean inIdealState = newSegInIs != null;
+      boolean hasZkMetadata = newSegMetadataAfter != null;
+
+      LOGGER.info("Caller threw: {}", callerThrew != null);
+      LOGGER.info("newSegment in IdealState: {}", inIdealState);
+      LOGGER.info("newSegment has ZK metadata: {}", hasZkMetadata);
+
+      // The orphan condition is precisely "in IdealState && no ZK metadata".
+      // Pre-fix: this assertion FAILS -- caller threw, ran cleanup, but the 
queued updater was
+      // applied by the drainer.
+      // Post-fix (cancellation): the entry is cancelled in commit()'s catch; 
the drainer skips
+      // it. The newSegment is neither in IdealState nor has ZK metadata -- a 
clean failure.
+      Assert.assertFalse(inIdealState && !hasZkMetadata,
+          "ORPHAN DETECTED: newSegment is in IdealState but has no ZK 
metadata. "
+              + "callerThrew=" + (callerThrew != null ? 
callerThrew.getMessage() : "null")
+              + ". The cancellation fix in IdealStateGroupCommit must skip the 
caller's "
+              + "still-queued entry so no subsequent batch applies it.");
+
+      // Post-fix expected state: caller threw AND newSegment is neither in 
IdealState nor has
+      // ZK metadata. The cancellation prevented the queued entry from being 
applied.
+      Assert.assertNotNull(callerThrew,
+          "Post-fix expectation: caller's commit() throws (all-or-nothing 
batch semantics).");
+      Assert.assertFalse(inIdealState,
+          "newSegment must NOT be in IdealState -- the caller's cancelled 
entry must be skipped "
+              + "by the drainer's iteration.");
+      Assert.assertFalse(hasZkMetadata,
+          "newSegment's ZK metadata must have been cleaned up by the caller's 
catch.");
+    } finally {
+      // Best-effort cleanup of any stragglers. ControllerTest.cleanup() 
asserts /SEGMENTS has
+      // zero child table directories, so we must remove the table-level node 
too.
+      try {
+        ZKMetadataProvider.removeSegmentZKMetadata(propertyStore, tableName, 
newSegment);
+      } catch (Throwable ignored) {
+      }
+      try {
+        propertyStore.remove("/SEGMENTS/" + tableName, 
org.apache.helix.AccessOption.PERSISTENT);
+      } catch (Throwable ignored) {
+      }
+      
TEST_INSTANCE.getHelixAdmin().dropResource(TEST_INSTANCE.getHelixClusterName(), 
tableName);
+    }
+  }
+
+  /**
+   * Uses reflection to push a stuck Entry (whose updater throws 
PermanentUpdaterException) into
+   * IdealStateGroupCommit's internal per-resource queue, without going 
through commit(). This is
+   * how we make the FIFO race deterministic in a unit test.
+   */
+  @SuppressWarnings("unchecked")
+  private static void injectStuckEntry(IdealStateGroupCommit commit, String 
resourceName)
+      throws Exception {
+    Class<?> queueClass =
+        
Class.forName("org.apache.pinot.common.utils.helix.IdealStateGroupCommit$Queue");
+    Class<?> entryClass =
+        
Class.forName("org.apache.pinot.common.utils.helix.IdealStateGroupCommit$Entry");
+
+    Field queuesField = 
IdealStateGroupCommit.class.getDeclaredField("_queues");
+    queuesField.setAccessible(true);
+    Object[] queues = (Object[]) queuesField.get(commit);
+
+    int bucket = (resourceName.hashCode() & Integer.MAX_VALUE) % queues.length;
+    Object queue = queues[bucket];
+
+    Field pendingField = queueClass.getDeclaredField("_pending");
+    pendingField.setAccessible(true);
+    ConcurrentLinkedQueue<Object> pending = (ConcurrentLinkedQueue<Object>) 
pendingField.get(queue);
+
+    Function<IdealState, IdealState> stuckUpdater = is -> {
+      throw new HelixHelper.PermanentUpdaterException(
+          "simulated exceeded max segment completion time");
+    };
+
+    Constructor<?> entryCtor = entryClass.getDeclaredConstructor(String.class, 
Function.class);
+    entryCtor.setAccessible(true);
+    Object stuckEntry = entryCtor.newInstance(resourceName, stuckUpdater);
+
+    pending.add(stuckEntry);
+    LOGGER.info("Injected stuck (always-throwing) entry into queue bucket {} 
for resource {}",
+        bucket, resourceName);
+  }
+
   @Test(invocationCount = 5)
   public void testGroupCommit()
       throws InterruptedException {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to