This is an automated email from the ASF dual-hosted git repository.
Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f231ee02c10 Fix race condition in IS update (#18559)
f231ee02c10 is described below
commit f231ee02c104c3a5a13ed1f53fe9ba2f43bbb8b0
Author: NOOB <[email protected]>
AuthorDate: Tue May 26 02:46:42 2026 +0530
Fix race condition in IS update (#18559)
---
.../common/utils/helix/IdealStateGroupCommit.java | 36 +-
.../helix/IdealStateGroupCommitTest.java | 414 +++++++++++++++++++++
2 files changed, 441 insertions(+), 9 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
index 9d1e1a1330f..4a2d83c65c2 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
@@ -73,6 +73,11 @@ public class IdealStateGroupCommit {
IdealState _updatedIdealState = null;
AtomicBoolean _sent = new AtomicBoolean(false);
Throwable _exception;
+ // Set to true when the entry's owning thread (the leader of a failed
batch) wants the next
+ // leader to skip this entry instead of applying it. Read in the iteration
loop. Volatile so
+ // the next leader sees it without synchronization. This is the O(1)
alternative to calling
+ // ConcurrentLinkedQueue.remove(entry) (which is O(n)) in the catch path.
+ volatile boolean _cancelled = false;
Entry(String resourceName, Function<IdealState, IdealState> updater) {
_resourceName = resourceName;
@@ -123,7 +128,7 @@ public class IdealStateGroupCommit {
// All pending entries have been processed, the updatedIdealState
should be set.
return entry._updatedIdealState;
}
- IdealState response = updateIdealState(helixManager, resourceName,
idealState -> {
+ updateIdealState(helixManager, resourceName, idealState -> {
IdealState updatedIdealState = idealState;
if (!processed.isEmpty()) {
queue._pending.addAll(processed);
@@ -135,6 +140,15 @@ public class IdealStateGroupCommit {
if (!ent._resourceName.equals(resourceName)) {
continue;
}
+ if (ent._cancelled) {
+ // Cancelled by a previous failed batch's leader (its commit()
catch). Skip and
+ // remove so this batch never applies an updater whose owner
has already given up.
+ // Without this, the owner's catch (e.g. cleanup of a
newly-created segment ZK
+ // metadata in the pauseless commit path) races against this
batch's CAS write
+ // and produces an "in IdealState, no ZK metadata" orphan.
+ it.remove();
+ continue;
+ }
processed.add(ent);
it.remove();
updatedIdealState = ent._updater.apply(updatedIdealState);
@@ -143,15 +157,19 @@ public class IdealStateGroupCommit {
}
return updatedIdealState;
}, retryPolicy, noChangeOk);
- if (response == null) {
- RuntimeException ex = new RuntimeException("Failed to update
IdealState");
- for (Entry ent : processed) {
- ent._exception = ex;
- ent._updatedIdealState = null;
- }
- throw ex;
- }
} catch (Throwable e) {
+ // If this leader's own entry was never iterated by the failed
batch, it's still
+ // sitting in _pending. Without cancelling it, the NEXT leader will
iterate it and
+ // apply its updater -- after this thread's caller has already
concluded the commit
+ // failed and run its cleanup (e.g., cleanup of a newly-created
segment's ZK metadata
+ // in the pauseless commit path). That race produces an "in
IdealState, no ZK
+ // metadata" orphan. We set the cancellation flag (O(1)) instead of
calling
+ // ConcurrentLinkedQueue.remove (O(n)); the next leader's iteration
check skips and
+ // removes any cancelled entry. If our entry was iterated (already
in `processed`),
+ // the flag is harmless. We don't add `entry` to `processed` because
nothing reads
+ // its _exception or waits on its _sent: the owner thread IS this
thread, and it's
+ // about to exit commit() via the throw below.
+ entry._cancelled = true;
// If there is an exception, set the exception for all processed
entries
for (Entry ent : processed) {
ent._exception = e;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
index 37728cacf13..7b17f7549fb 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
@@ -18,18 +18,30 @@
*/
package org.apache.pinot.controller.helix;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.helix.HelixManager;
import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.helix.IdealStateGroupCommit;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,6 +102,408 @@ public class IdealStateGroupCommitTest {
TEST_INSTANCE.cleanup();
}
+ /**
+ * Regression test for the leader-entry cancellation fix in {@link
IdealStateGroupCommit#commit}.
+ *
+ * Setup mirrors the pre-fix race condition that produced "in IdealState, no
ZK metadata"
+ * orphans in the pauseless segment commit path:
+ *
+ * 1. The queue already contains an entry whose updater will throw
PermanentUpdaterException
+ * (a pauseless segment whose 300s max-completion-time deadline has
passed while waiting
+ * behind a long IdealState operation such as a bulk retention
deletion).
+ * 2. A fresh thread calls commit() with a healthy updater. Its entry is
enqueued AFTER the
+ * stuck entry in FIFO order.
+ * 3. The fresh thread becomes leader and iterates the queue in FIFO order.
+ *
+ * Pre-fix behavior (the bug): the stuck entry's updater threw, iteration
stopped, the fresh
+ * thread's commit() exited with that exception, but its OWN queued entry
was never iterated and
+ * stayed in _pending. A subsequent leader applied it. The fresh thread's
caller observed "my
+ * update failed" and ran cleanup (removing the newly-created segment's ZK
metadata), yet the
+ * subsequent leader's CAS put the segment in IdealState -- producing the
orphan.
+ *
+ * Post-fix behavior (asserted here): the catch in commit() sets _cancelled
on the fresh
+ * thread's own entry and adds it to `processed`. The fresh thread's
commit() still throws
+ * (pre-fix all-or-nothing batch semantics are preserved), but a subsequent
leader's iteration
+ * sees the cancelled entry, skips it, and removes it. No orphan can form
because the fresh
+ * updater is never applied to IdealState by any future batch.
+ */
+ @Test
+ public void testFreshUpdaterAppliedAfterCallerThrows()
+ throws Exception {
+ String tableName = TABLE_NAME_PREFIX + "race_OFFLINE";
+ IdealState initialState = new IdealState(tableName);
+ initialState.setStateModelDefRef("OnlineOffline");
+ initialState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+ initialState.setReplicas("1");
+ initialState.setNumPartitions(0);
+
TEST_INSTANCE.getHelixAdmin().addResource(TEST_INSTANCE.getHelixClusterName(),
tableName, initialState);
+
+ try {
+ IdealStateGroupCommit commit = new IdealStateGroupCommit();
+
+ // Step 1: Pre-populate the queue with a stuck entry whose updater throws
+ // PermanentUpdaterException. Simulates a queued entry for a pauseless
segment whose 300s
+ // deadline has expired.
+ injectStuckEntry(commit, tableName);
+
+ // Step 2: Fresh thread submits a healthy updater that adds a partition.
+ Function<IdealState, IdealState> freshUpdater = is -> {
+ is.setPartitionState("freshPartition", "instance1", "ONLINE");
+ return is;
+ };
+
+ Throwable freshException = null;
+ try {
+ commit.commit(TEST_INSTANCE.getHelixManager(), tableName, freshUpdater,
+ RetryPolicies.noDelayRetryPolicy(1), false);
+ } catch (Throwable e) {
+ freshException = e;
+ }
+
+ LOGGER.info("Fresh thread's commit() returned with exception: {}",
+ freshException == null ? "(none)"
+ : freshException.getClass().getSimpleName() + ": " +
freshException.getMessage());
+
+ // The fresh thread's commit() throws (the all-or-nothing batch
semantics are preserved).
+ Assert.assertNotNull(freshException,
+ "Fresh thread's commit() should throw because the batched IdealState
commit aborts when "
+ + "the co-batched stuck entry's lambda throws
PermanentUpdaterException.");
+
+ // Step 3: A drainer commit runs. If the fresh thread's entry was still
in _pending and
+ // NOT cancelled (the pre-fix bug), the drainer would iterate it and
write freshPartition
+ // into IdealState -- creating the orphan when paired with a caller that
has already
+ // cleaned up new segment metadata. With the cancellation fix, the entry
is skipped.
+ Function<IdealState, IdealState> drainerUpdater = is -> {
+ is.setPartitionState("drainerPartition", "instance1", "ONLINE");
+ return is;
+ };
+ commit.commit(TEST_INSTANCE.getHelixManager(), tableName, drainerUpdater,
+ RetryPolicies.noDelayRetryPolicy(1), false);
+
+ IdealState finalState =
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), tableName);
+ Map<String, String> freshMap =
finalState.getInstanceStateMap("freshPartition");
+ Map<String, String> drainerMap =
finalState.getInstanceStateMap("drainerPartition");
+
+ LOGGER.info("Final IdealState partitions: {}",
finalState.getPartitionSet());
+ LOGGER.info("freshPartition state map: {}", freshMap);
+ LOGGER.info("drainerPartition state map: {}", drainerMap);
+
+ // Sanity: the drainer's own change must be present (proves the drainer
commit ran).
+ Assert.assertNotNull(drainerMap, "Drainer commit should have written
drainerPartition.");
+ Assert.assertEquals(drainerMap.get("instance1"), "ONLINE");
+
+ // The key assertion: the fresh updater's change must NOT be in
IdealState. The cancellation
+ // flag must have caused the drainer's iteration to skip and remove the
fresh entry. If this
+ // assertion fails, the cancellation fix has regressed and the
orphan-creation race can
+ // reoccur (caller threw, ran cleanup, but subsequent leader applied the
update anyway).
+ Assert.assertNull(freshMap,
+ "Fresh updater's change must NOT be in IdealState. The fresh
thread's commit() threw, "
+ + "indicating to its caller that the update did not happen; a
subsequent leader "
+ + "must not have applied it. Found: " + freshMap);
+
+ LOGGER.info("=== FIX VERIFIED: caller threw, no future leader applied
the cancelled entry ===");
+ } finally {
+
TEST_INSTANCE.getHelixAdmin().dropResource(TEST_INSTANCE.getHelixClusterName(),
tableName);
+ }
+ }
+
+ /**
+ * Multi-thread consistency test: N stuck owners + M fresh owners race to
commit on the same
+ * resource. The post-fix contract is per-owner consistency, not per-owner
success: every owner
+ * either observes a successful commit AND its update is present in
IdealState, OR observes an
+ * exception AND its update is NOT present in IdealState. The cancellation
flag must ensure no
+ * orphan can be left behind by any failed leader.
+ */
+ @Test
+ public void testMultipleStuckAndFreshConsistency()
+ throws Exception {
+ String tableName = TABLE_NAME_PREFIX + "multi_OFFLINE";
+ IdealState initialState = new IdealState(tableName);
+ initialState.setStateModelDefRef("OnlineOffline");
+ initialState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+ initialState.setReplicas("1");
+ initialState.setNumPartitions(0);
+
TEST_INSTANCE.getHelixAdmin().addResource(TEST_INSTANCE.getHelixClusterName(),
tableName, initialState);
+
+ int stuckCount = 3;
+ int freshCount = 5;
+ IdealStateGroupCommit commit = new IdealStateGroupCommit();
+ ExecutorService pool = Executors.newFixedThreadPool(stuckCount +
freshCount);
+
+ try {
+ CountDownLatch allReady = new CountDownLatch(stuckCount + freshCount);
+ CountDownLatch goSignal = new CountDownLatch(1);
+
+ List<AtomicReference<Throwable>> stuckResults = new ArrayList<>();
+ for (int i = 0; i < stuckCount; i++) {
+ AtomicReference<Throwable> result = new AtomicReference<>();
+ stuckResults.add(result);
+ final int idx = i;
+ pool.submit(() -> {
+ allReady.countDown();
+ try {
+ goSignal.await();
+ } catch (InterruptedException ignored) {
+ }
+ Function<IdealState, IdealState> stuckUpdater = is -> {
+ throw new HelixHelper.PermanentUpdaterException("stuck-" + idx);
+ };
+ try {
+ commit.commit(TEST_INSTANCE.getHelixManager(), tableName,
stuckUpdater,
+ RetryPolicies.noDelayRetryPolicy(1), false);
+ } catch (Throwable t) {
+ result.set(t);
+ }
+ });
+ }
+
+ List<AtomicReference<Throwable>> freshResults = new ArrayList<>();
+ for (int i = 0; i < freshCount; i++) {
+ AtomicReference<Throwable> result = new AtomicReference<>();
+ freshResults.add(result);
+ final int idx = i;
+ pool.submit(() -> {
+ allReady.countDown();
+ try {
+ goSignal.await();
+ } catch (InterruptedException ignored) {
+ }
+ Function<IdealState, IdealState> freshUpdater = is -> {
+ is.setPartitionState("freshPart-" + idx, "instance1", "ONLINE");
+ return is;
+ };
+ try {
+ commit.commit(TEST_INSTANCE.getHelixManager(), tableName,
freshUpdater,
+ RetryPolicies.noDelayRetryPolicy(1), false);
+ } catch (Throwable t) {
+ result.set(t);
+ }
+ });
+ }
+
+ Assert.assertTrue(allReady.await(10, TimeUnit.SECONDS), "Worker threads
failed to start");
+ goSignal.countDown();
+ pool.shutdown();
+ Assert.assertTrue(pool.awaitTermination(30, TimeUnit.SECONDS), "Workers
did not finish");
+
+ // After the race, a drainer commit ensures any
successful-but-not-yet-applied entries are
+ // CAS-written, and any cancelled entries are skipped+removed. This
makes the final
+ // IdealState the ground truth for which fresh updaters actually took
effect.
+ commit.commit(TEST_INSTANCE.getHelixManager(), tableName, is -> {
+ is.setPartitionState("drainer", "instance1", "ONLINE");
+ return is;
+ }, RetryPolicies.noDelayRetryPolicy(1), false);
+
+ IdealState finalState =
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), tableName);
+
+ // Every stuck owner must observe a throw. The stuck lambda always throws
+ // PermanentUpdaterException, so the batch they participated in always
aborts.
+ for (int i = 0; i < stuckCount; i++) {
+ Throwable t = stuckResults.get(i).get();
+ Assert.assertNotNull(t, "Stuck owner " + i + " should have thrown");
+ }
+
+ // Consistency check for every fresh owner: observed outcome must match
IdealState.
+ int freshSucceeded = 0;
+ int freshThrew = 0;
+ for (int i = 0; i < freshCount; i++) {
+ Throwable t = freshResults.get(i).get();
+ Map<String, String> inIs = finalState.getInstanceStateMap("freshPart-"
+ i);
+ if (t == null) {
+ freshSucceeded++;
+ Assert.assertNotNull(inIs,
+ "Fresh owner " + i + " observed success but its partition is NOT
in IdealState. "
+ + "Owner outcome and IdealState are inconsistent.");
+ Assert.assertEquals(inIs.get("instance1"), "ONLINE");
+ } else {
+ freshThrew++;
+ Assert.assertNull(inIs,
+ "Fresh owner " + i + " observed exception [" + t.getMessage()
+ + "] but its partition IS in IdealState. This is the
orphan-creating race: "
+ + "the cancellation flag must skip a failed leader's entry
so no subsequent "
+ + "batch applies it.");
+ }
+ }
+
+ LOGGER.info("=== Consistency verified: {} stuck threw, {} fresh
succeeded, {} fresh threw ===",
+ stuckCount, freshSucceeded, freshThrew);
+ } finally {
+ if (!pool.isShutdown()) {
+ pool.shutdownNow();
+ }
+
TEST_INSTANCE.getHelixAdmin().dropResource(TEST_INSTANCE.getHelixClusterName(),
tableName);
+ }
+ }
+
+ /**
+ * Integration-style test that walks the same chain the orphan-creating
production bug walked:
+ *
+ * - Step 2 of commitSegmentMetadataInternal: write the new consuming
segment's ZK metadata
+ * to the property store with status IN_PROGRESS.
+ * - Step 3: update IdealState to mark the old segment ONLINE and add the
new segment as
+ * CONSUMING -- via IdealStateGroupCommit.commit().
+ * - If commit() throws, simulate the catch block in
commitSegmentMetadataInternal: call
+ * removeSegmentZKMetadataBestEffort on the new segment.
+ * - Then let a drainer commit run (mimics any subsequent batch picking up
the queue).
+ *
+ * Orphan condition: the new segment ends up in IdealState but its ZK
metadata is gone.
+ * Pre-fix: a co-batched stuck pauseless entry threw, causing this caller's
commit() to throw
+ * (so cleanup ran) while the caller's own updater stayed in _pending and
was applied by the
+ * next leader -- producing the orphan.
+ *
+ * The test asserts that no orphan is produced. With the cancellation fix it
passes -- the
+ * caller still throws (preserving pre-fix all-or-nothing batch semantics),
but the entry is
+ * cancelled so the drainer never applies it.
+ */
+ @Test
+ public void testNoOrphanWhenCoBatchedEntryThrowsDuringStepThree()
+ throws Exception {
+ String tableName = TABLE_NAME_PREFIX + "orphan_REALTIME";
+ String oldSegment = "orphanTest__0__0__T0000Z";
+ String newSegment = "orphanTest__0__1__T0010Z";
+ String instance = "instance1";
+
+ IdealState initialState = new IdealState(tableName);
+ initialState.setStateModelDefRef("OnlineOffline");
+ initialState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+ initialState.setReplicas("1");
+ initialState.setNumPartitions(0);
+ initialState.setPartitionState(oldSegment, instance, "CONSUMING");
+
TEST_INSTANCE.getHelixAdmin().addResource(TEST_INSTANCE.getHelixClusterName(),
tableName, initialState);
+
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
TEST_INSTANCE.getPropertyStore();
+
+ try {
+ IdealStateGroupCommit commit = new IdealStateGroupCommit();
+
+ // Step 2 (mimicked): persist new segment's ZK metadata as IN_PROGRESS.
+ SegmentZKMetadata newSegmentMetadata = new SegmentZKMetadata(newSegment);
+ newSegmentMetadata.setStatus(Status.IN_PROGRESS);
+ boolean wrote = ZKMetadataProvider.setSegmentZKMetadata(propertyStore,
tableName, newSegmentMetadata, -1);
+ Assert.assertTrue(wrote, "Pre-condition: writing new segment ZK metadata
should succeed");
+
+ // Co-batched in-flight throwing entry: mimics a pauseless segment that
has timed out
+ // and whose queued updater will throw PermanentUpdaterException when
iterated.
+ injectStuckEntry(commit, tableName);
+
+ // Step 3 updater: flip old segment to ONLINE, add new segment as
CONSUMING.
+ Function<IdealState, IdealState> stepThreeUpdater = is -> {
+ is.setPartitionState(oldSegment, instance, "ONLINE");
+ is.setPartitionState(newSegment, instance, "CONSUMING");
+ return is;
+ };
+
+ Throwable callerThrew = null;
+ try {
+ commit.commit(TEST_INSTANCE.getHelixManager(), tableName,
stepThreeUpdater,
+ RetryPolicies.noDelayRetryPolicy(1), false);
+ } catch (Throwable t) {
+ callerThrew = t;
+ // Mimic commitSegmentMetadataInternal's catch: delete the new
segment's ZK metadata
+ // best-effort because Step 3 appeared to fail.
+ ZKMetadataProvider.removeSegmentZKMetadata(propertyStore, tableName,
newSegment);
+ LOGGER.info("Caller observed exception, ran
removeSegmentZKMetadataBestEffort(newSegment): {}",
+ t.getMessage());
+ }
+
+ // Drainer commit: mimics any subsequent IdealStateGroupCommit batch
that processes
+ // whatever is left in the queue. If our caller's updater were still
queued (the pre-fix
+ // bug), the drainer would apply it here -- writing newSegment into
IdealState even
+ // though we already deleted its ZK metadata. With the cancellation fix,
the drainer's
+ // iteration skips the cancelled entry.
+ Function<IdealState, IdealState> drainer = is -> {
+ is.setPartitionState("drainerPartition", instance, "ONLINE");
+ return is;
+ };
+ commit.commit(TEST_INSTANCE.getHelixManager(), tableName, drainer,
+ RetryPolicies.noDelayRetryPolicy(1), false);
+
+ // Inspect ground truth.
+ IdealState finalState =
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), tableName);
+ Map<String, String> newSegInIs =
finalState.getInstanceStateMap(newSegment);
+ SegmentZKMetadata newSegMetadataAfter =
ZKMetadataProvider.getSegmentZKMetadata(
+ propertyStore, tableName, newSegment);
+
+ boolean inIdealState = newSegInIs != null;
+ boolean hasZkMetadata = newSegMetadataAfter != null;
+
+ LOGGER.info("Caller threw: {}", callerThrew != null);
+ LOGGER.info("newSegment in IdealState: {}", inIdealState);
+ LOGGER.info("newSegment has ZK metadata: {}", hasZkMetadata);
+
+ // The orphan condition is precisely "in IdealState && no ZK metadata".
+ // Pre-fix: this assertion FAILS -- caller threw, ran cleanup, but the
queued updater was
+ // applied by the drainer.
+ // Post-fix (cancellation): the entry is cancelled in commit()'s catch;
the drainer skips
+ // it. The newSegment is neither in IdealState nor has ZK metadata -- a
clean failure.
+ Assert.assertFalse(inIdealState && !hasZkMetadata,
+ "ORPHAN DETECTED: newSegment is in IdealState but has no ZK
metadata. "
+ + "callerThrew=" + (callerThrew != null ?
callerThrew.getMessage() : "null")
+ + ". The cancellation fix in IdealStateGroupCommit must skip the
caller's "
+ + "still-queued entry so no subsequent batch applies it.");
+
+ // Post-fix expected state: caller threw AND newSegment is neither in
IdealState nor has
+ // ZK metadata. The cancellation prevented the queued entry from being
applied.
+ Assert.assertNotNull(callerThrew,
+ "Post-fix expectation: caller's commit() throws (all-or-nothing
batch semantics).");
+ Assert.assertFalse(inIdealState,
+ "newSegment must NOT be in IdealState -- the caller's cancelled
entry must be skipped "
+ + "by the drainer's iteration.");
+ Assert.assertFalse(hasZkMetadata,
+ "newSegment's ZK metadata must have been cleaned up by the caller's
catch.");
+ } finally {
+ // Best-effort cleanup of any stragglers. ControllerTest.cleanup()
asserts /SEGMENTS has
+ // zero child table directories, so we must remove the table-level node
too.
+ try {
+ ZKMetadataProvider.removeSegmentZKMetadata(propertyStore, tableName,
newSegment);
+ } catch (Throwable ignored) {
+ }
+ try {
+ propertyStore.remove("/SEGMENTS/" + tableName,
org.apache.helix.AccessOption.PERSISTENT);
+ } catch (Throwable ignored) {
+ }
+
TEST_INSTANCE.getHelixAdmin().dropResource(TEST_INSTANCE.getHelixClusterName(),
tableName);
+ }
+ }
+
+ /**
+ * Uses reflection to push a stuck Entry (whose updater throws
PermanentUpdaterException) into
+ * IdealStateGroupCommit's internal per-resource queue, without going
through commit(). This is
+ * how we make the FIFO race deterministic in a unit test.
+ */
+ @SuppressWarnings("unchecked")
+ private static void injectStuckEntry(IdealStateGroupCommit commit, String
resourceName)
+ throws Exception {
+ Class<?> queueClass =
+
Class.forName("org.apache.pinot.common.utils.helix.IdealStateGroupCommit$Queue");
+ Class<?> entryClass =
+
Class.forName("org.apache.pinot.common.utils.helix.IdealStateGroupCommit$Entry");
+
+ Field queuesField =
IdealStateGroupCommit.class.getDeclaredField("_queues");
+ queuesField.setAccessible(true);
+ Object[] queues = (Object[]) queuesField.get(commit);
+
+ int bucket = (resourceName.hashCode() & Integer.MAX_VALUE) % queues.length;
+ Object queue = queues[bucket];
+
+ Field pendingField = queueClass.getDeclaredField("_pending");
+ pendingField.setAccessible(true);
+ ConcurrentLinkedQueue<Object> pending = (ConcurrentLinkedQueue<Object>)
pendingField.get(queue);
+
+ Function<IdealState, IdealState> stuckUpdater = is -> {
+ throw new HelixHelper.PermanentUpdaterException(
+ "simulated exceeded max segment completion time");
+ };
+
+ Constructor<?> entryCtor = entryClass.getDeclaredConstructor(String.class,
Function.class);
+ entryCtor.setAccessible(true);
+ Object stuckEntry = entryCtor.newInstance(resourceName, stuckUpdater);
+
+ pending.add(stuckEntry);
+ LOGGER.info("Injected stuck (always-throwing) entry into queue bucket {}
for resource {}",
+ bucket, resourceName);
+ }
+
@Test(invocationCount = 5)
public void testGroupCommit()
throws InterruptedException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]