This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new a28d8f7 Fix RepairJobsTest.testNoTreesRetainedAfterDifference by waiting for latch a28d8f7 is described below commit a28d8f7590cc98ef25eced4b2968c577d0156e50 Author: Zhao Yang <zhaoyangsingap...@gmail.com> AuthorDate: Mon Jun 22 17:34:55 2020 +0100 Fix RepairJobsTest.testNoTreesRetainedAfterDifference by waiting for latch patch by Zhao Yang; reviewed by Caleb Rackliffe for CASSANDRA-15872 --- .../org/apache/cassandra/repair/RepairJobTest.java | 39 ++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java index 5713898..d3af58f 100644 --- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -58,11 +60,13 @@ import org.apache.cassandra.repair.messages.SyncRequest; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.SessionSummary; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.asserts.SyncTaskListAssert; @@ -91,7 +95,7 @@ public class RepairJobTest private static InetAddressAndPort addr3; private static InetAddressAndPort addr4; private static InetAddressAndPort addr5; - private RepairSession session; + private MeasureableRepairSession session; private RepairJob job; private RepairJobDesc sessionJobDesc; @@ -99,6 +103,8 @@ public class RepairJobTest // memory retention from CASSANDRA-14096 private static class MeasureableRepairSession extends RepairSession { + private final List<Callable<?>> syncCompleteCallbacks = new ArrayList<>(); + public MeasureableRepairSession(UUID parentRepairSession, UUID id, CommonRange commonRange, String keyspace, RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair, boolean force, PreviewKind previewKind, boolean optimiseStreams, String... cfnames) @@ -110,8 +116,32 @@ public class RepairJobTest { DebuggableThreadPoolExecutor executor = super.createExecutor(); executor.setKeepAliveTime(THREAD_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - return executor; } + return executor; + } + + @Override + public void syncComplete(RepairJobDesc desc, SyncNodePair nodes, boolean success, List<SessionSummary> summaries) + { + for (Callable<?> callback : syncCompleteCallbacks) + { + try + { + callback.call(); + } + catch (Exception e) + { + throw Throwables.cleaned(e); + } + } + super.syncComplete(desc, nodes, success, summaries); + } + + public void registerSyncCompleteCallback(Callable<?> callback) + { + syncCompleteCallbacks.add(callback); + } } + @BeforeClass public static void setupClass() throws UnknownHostException { @@ -221,10 +251,15 @@ public class RepairJobTest // SyncTasks themselves should not contain significant memory SyncTaskListAssert.assertThat(syncTasks).hasSizeLessThan(0.2 * singleTreeSize); + // block syncComplete execution until test has verified session still retains the trees + CompletableFuture<?> future = new CompletableFuture<>(); + session.registerSyncCompleteCallback(future::get); ListenableFuture<List<SyncStat>> syncResults = job.executeTasks(syncTasks); // Immediately following execution the internal execution queue should still retain the trees assertThat(ObjectSizes.measureDeep(session)).isGreaterThan(singleTreeSize); + // unblock syncComplete callback, session should remove trees + future.complete(null); // The session retains memory in the contained executor until the threads expire, so we wait for the threads // that ran the Tree -> SyncTask conversions to die and release the memory --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org