This is an automated email from the ASF dual-hosted git repository. bereng pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push: new 23ad7c3 Flaky ActiveRepairServiceTest.testRejectWhenPoolFullStrategy new 1676add Merge branch 'cassandra-4.0.0' into cassandra-4.0 23ad7c3 is described below commit 23ad7c301e227d5ea88cea0784b32e6351603912 Author: Bereng <berenguerbl...@gmail.com> AuthorDate: Fri Jun 11 07:52:05 2021 +0200 Flaky ActiveRepairServiceTest.testRejectWhenPoolFullStrategy patch by Berenguer Blasi; reviewed by Andres de la Peña and Michael Semb Wever for CASSANDRA-16685 --- .../cassandra/service/ActiveRepairServiceTest.java | 40 +++++++++++++++++----- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index d0a367a..ad680f5 100644 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -18,19 +18,27 @@ */ package org.apache.cassandra.service; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; + import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -55,8 +63,8 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.repair.messages.RepairOption; -import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Refs; import org.apache.cassandra.utils.concurrent.SimpleCondition; @@ -77,6 +85,7 @@ public class ActiveRepairServiceTest public static final String KEYSPACE5 = "Keyspace5"; public static final String CF_STANDARD1 = "Standard1"; public static final String CF_COUNTER = "Counter1"; + public static final int TASK_SECONDS = 10; public String cfname; public ColumnFamilyStore store; @@ -376,8 +385,20 @@ public class ActiveRepairServiceTest { Condition blocked = new SimpleCondition(); CountDownLatch completed = new CountDownLatch(2); + + /* + * CASSANDRA-16685 This is a Java bug. When the underlying executor's queue is a SynchronousQueue, there can + * be races just after the ThreadPool's initialization while juggling and spinning up threads internally + * leading to false rejections. That queue needs a thread ready to pick up the task immediately or it will + * produce a reject exception upon 'offer()' method call on the executor's code. If the executor is still + * initializing or threads are not ready to take work you can get false rejections. + * + * A sleep has been added to give time to the thread pool to be ready to get work. + */ + Thread.sleep(250); validationExecutor.submit(new Task(blocked, completed)); validationExecutor.submit(new Task(blocked, completed)); + try { validationExecutor.submit(new Task(blocked, completed)); @@ -387,10 +408,13 @@ public class ActiveRepairServiceTest { // expected } + // allow executing tests to complete blocked.signalAll(); - completed.await(10, TimeUnit.SECONDS); + completed.await(TASK_SECONDS + 1, TimeUnit.SECONDS); + // Submission is unblocked + Thread.sleep(250); validationExecutor.submit(() -> {}); } finally @@ -425,7 +449,7 @@ public class ActiveRepairServiceTest } // Make sure all tasks have been submitted to the validation executor - allSubmitted.await(10, TimeUnit.SECONDS); + allSubmitted.await(TASK_SECONDS + 1, TimeUnit.SECONDS); // Give the tasks we expect to execute immediately chance to be scheduled Util.spinAssertEquals(2 , ((DebuggableThreadPoolExecutor) validationExecutor)::getActiveTaskCount, 1); @@ -436,7 +460,7 @@ public class ActiveRepairServiceTest Assert.assertEquals(3, ((DebuggableThreadPoolExecutor) validationExecutor).getPendingTaskCount()); // allow executing tests to complete blocked.signalAll(); - completed.await(10, TimeUnit.SECONDS); + completed.await(TASK_SECONDS + 1, TimeUnit.SECONDS); } finally { @@ -458,7 +482,7 @@ public class ActiveRepairServiceTest public void run() { - Uninterruptibles.awaitUninterruptibly(blocked, 10, TimeUnit.SECONDS); + Uninterruptibles.awaitUninterruptibly(blocked, TASK_SECONDS, TimeUnit.SECONDS); complete.countDown(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org