apoorvmittal10 commented on code in PR #17636: URL: https://github.com/apache/kafka/pull/17636#discussion_r1829909635
########## server-common/src/test/java/org/apache/kafka/server/purgatory/DelayedOperationTest.java: ########## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class DelayedOperationTest { + + private final MockKey test1 = new MockKey("test1"); + private final MockKey test2 = new MockKey("test2"); + private final MockKey test3 = new MockKey("test3"); + private final Random random = new Random(); + private DelayedOperationPurgatory<DelayedOperation> purgatory; + private ScheduledExecutorService executorService; + + @BeforeEach + public void setUp() { + purgatory = new DelayedOperationPurgatory<>("mock", 0); + } + + @AfterEach + public void tearDown() throws Exception { + purgatory.shutdown(); + if (executorService != null) + executorService.shutdown(); + } + + static class MockKey implements DelayedOperationKey { + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MockKey mockKey = (MockKey) o; + return Objects.equals(key, mockKey.key); + } + + @Override + public int hashCode() { + return key != null ? key.hashCode() : 0; + } + + final String key; + + MockKey(String key) { + this.key = key; + } + + @Override + public String keyLabel() { + return key; + } + } + + @Test + public void testLockInTryCompleteElseWatch() { + DelayedOperation op = new DelayedOperation(100000L) { + @Override + public void onExpiration() {} + @Override + public void onComplete() {} + @Override + public boolean tryComplete() { + assertTrue(((ReentrantLock) lock).isHeldByCurrentThread()); + return false; + } + @Override + public boolean safeTryComplete() { + fail("tryCompleteElseWatch should not use safeTryComplete"); + return super.safeTryComplete(); + } + }; + purgatory.tryCompleteElseWatch(op, Collections.singletonList(new MockKey("key"))); + } + + DelayedOperation op(boolean shouldComplete) { + return new DelayedOperation(100000L) { + @Override + public void onExpiration() {} + + @Override + public void onComplete() {} + + @Override + public boolean tryComplete() { + assertTrue(((ReentrantLock) lock).isHeldByCurrentThread()); + return shouldComplete; + } + }; + } + + @Test + public void testSafeTryCompleteOrElse() { + final AtomicBoolean pass = new AtomicBoolean(); + assertFalse(op(false).safeTryCompleteOrElse(() -> pass.set(true))); + assertTrue(pass.get()); + assertTrue(op(true).safeTryCompleteOrElse(() -> fail("this method should NOT be executed"))); + } + + @Test + public void testRequestSatisfaction() { + MockDelayedOperation r1 = new MockDelayedOperation(100000L); + MockDelayedOperation r2 = new MockDelayedOperation(100000L); + assertEquals(0, purgatory.checkAndComplete(test1), "With no waiting requests, nothing should be satisfied"); + assertFalse(purgatory.tryCompleteElseWatch(r1, Collections.singletonList(new MockKey("test1"))), "r1 not satisfied and hence watched"); + assertEquals(0, purgatory.checkAndComplete(test1), "Still nothing satisfied"); + assertFalse(purgatory.tryCompleteElseWatch(r2, Collections.singletonList(new MockKey("test2"))), "r2 not satisfied and hence watched"); + assertEquals(0, purgatory.checkAndComplete(test2), "Still nothing satisfied"); + r1.completable = true; + assertEquals(1, purgatory.checkAndComplete(test1), "r1 satisfied"); + assertEquals(0, purgatory.checkAndComplete(test1), "Nothing satisfied"); + r2.completable = true; + assertEquals(1, purgatory.checkAndComplete(test2), "r2 satisfied"); + assertEquals(0, purgatory.checkAndComplete(test2), "Nothing satisfied"); + } + + @Test + public void testRequestExpiry() throws Exception { + long expiration = 20L; + long start = Time.SYSTEM.hiResClockMs(); + MockDelayedOperation r1 = new MockDelayedOperation(expiration); + MockDelayedOperation r2 = new MockDelayedOperation(200000L); + assertFalse(purgatory.tryCompleteElseWatch(r1, Collections.singletonList(test1)), "r1 not satisfied and hence watched"); + assertFalse(purgatory.tryCompleteElseWatch(r2, Collections.singletonList(test2)), "r2 not satisfied and hence watched"); + r1.awaitExpiration(); + long elapsed = Time.SYSTEM.hiResClockMs() - start; + assertTrue(r1.isCompleted(), "r1 completed due to expiration"); + assertFalse(r2.isCompleted(), "r2 hasn't completed"); + assertTrue(elapsed >= expiration, "Time for expiration " + elapsed + " should at least " + expiration); + } + + @Test + public void testRequestPurge() { + MockDelayedOperation r1 = new MockDelayedOperation(100000L); + MockDelayedOperation r2 = new MockDelayedOperation(100000L); + MockDelayedOperation r3 = new MockDelayedOperation(100000L); + purgatory.tryCompleteElseWatch(r1, Collections.singletonList(test1)); + purgatory.tryCompleteElseWatch(r2, Arrays.asList(test1, test2)); + purgatory.tryCompleteElseWatch(r3, Arrays.asList(test1, test2, test3)); + + assertEquals(3, purgatory.numDelayed(), "Purgatory should have 3 total delayed operations"); + assertEquals(6, purgatory.watched(), "Purgatory should have 6 watched elements"); + + // complete the operations, it should immediately be purged from the delayed operation + r2.completable = true; + r2.tryComplete(); + assertEquals(2, purgatory.numDelayed(), "Purgatory should have 2 total delayed operations instead of " + purgatory.numDelayed()); + + r3.completable = true; + r3.tryComplete(); + assertEquals(1, purgatory.numDelayed(), "Purgatory should have 1 total delayed operations instead of " + purgatory.numDelayed()); + + // checking a watch should purge the watch list + purgatory.checkAndComplete(test1); + assertEquals(4, purgatory.watched(), "Purgatory should have 4 watched elements instead of " + purgatory.watched()); + + purgatory.checkAndComplete(test2); + assertEquals(2, purgatory.watched(), "Purgatory should have 2 watched elements instead of " + purgatory.watched()); + + purgatory.checkAndComplete(test3); + assertEquals(1, purgatory.watched(), "Purgatory should have 1 watched elements instead of " + purgatory.watched()); + } + + @Test + public void shouldCancelForKeyReturningCancelledOperations() { + purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test1)); + purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test1)); + purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test2)); + + List<DelayedOperation> cancelledOperations = purgatory.cancelForKey(test1); + assertEquals(2, cancelledOperations.size()); + assertEquals(1, purgatory.numDelayed()); + assertEquals(1, purgatory.watched()); + } + + @Test + public void shouldReturnNilOperationsOnCancelForKeyWhenKeyDoesntExist() { + List<DelayedOperation> cancelledOperations = purgatory.cancelForKey(test1); + assertTrue(cancelledOperations.isEmpty()); + } + + /** + * Test `tryComplete` with multiple threads to verify that there are no timing windows + * when completion is not performed even if the thread that makes the operation completable + * may not be able to acquire the operation lock. Since it is difficult to test all scenarios, + * this test uses random delays with a large number of threads. + */ + @Test + public void testTryCompleteWithMultipleThreads() throws ExecutionException, InterruptedException { + executorService = Executors.newScheduledThreadPool(20); + int maxDelayMs = 10; + int completionAttempts = 20; + List<TestDelayOperation> ops = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + TestDelayOperation op = new TestDelayOperation(i, completionAttempts, maxDelayMs); + purgatory.tryCompleteElseWatch(op, Collections.singletonList(op.key)); + ops.add(op); + } + + List<Future<?>> futures = new ArrayList<>(); + for (int i = 1; i <= completionAttempts; i++) { + for (TestDelayOperation op : ops) { + futures.add(scheduleTryComplete(executorService, op, random.nextInt(maxDelayMs))); + } + } + for (Future<?> future : futures) { + future.get(); + } + ops.forEach(op -> assertTrue(op.isCompleted(), "Operation " + op.key.keyLabel() + " should have completed")); + } + + Future<?> scheduleTryComplete(ScheduledExecutorService executorService, TestDelayOperation op, long delayMs) { + return executorService.schedule(() -> { + if (op.completionAttemptsRemaining.decrementAndGet() == 0) { + op.completable = true; + } + purgatory.checkAndComplete(op.key); + }, delayMs, TimeUnit.MILLISECONDS); + } + + static class MockDelayedOperation extends DelayedOperation { Review Comment: Can this be private? ########## server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.server.util.timer.TimerTask; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * An operation whose processing needs to be delayed for at most the given delayMs. For example + * a delayed produce operation could be waiting for specified number of acks; or + * a delayed fetch operation could be waiting for a given number of bytes to accumulate. + * <br/> + * The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once. + * Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either + * forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed, + * or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls + * forceComplete(). + * <br/> + * A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete(). + * <br/> + * Noted that if you add a future delayed operation that calls ReplicaManager.appendRecords() in onComplete() + * like DelayedJoin, you must be aware that this operation's onExpiration() needs to call actionQueue.tryCompleteAction(). + */ +public abstract class DelayedOperation extends TimerTask { + + private final AtomicBoolean completed = new AtomicBoolean(false); + // Visible for testing + final Lock lock; + + public DelayedOperation(long delayMs, Optional<Lock> lockOpt) { + this(delayMs, lockOpt.orElse(new ReentrantLock())); + } + + public DelayedOperation(long delayMs) { + this(delayMs, new ReentrantLock()); + } + + public DelayedOperation(long delayMs, Lock lock) { + super(delayMs); + this.lock = lock; + } + + /* + * Force completing the delayed operation, if not already completed. + * This function can be triggered when + * + * 1. The operation has been verified to be completable inside tryComplete() + * 2. The operation has expired and hence needs to be completed right now + * + * Return true iff the operation is completed by the caller: note that + * concurrent threads can try to complete the same operation, but only + * the first thread will succeed in completing the operation and return + * true, others will still return false + */ + public boolean forceComplete() { + if (completed.compareAndSet(false, true)) { + // cancel the timeout timer + cancel(); + onComplete(); + return true; + } else { + return false; + } + } + + /** + * Check if the delayed operation is already completed + */ + public boolean isCompleted() { + return completed.get(); + } + + /** + * Call-back to execute when a delayed operation gets expired and hence forced to complete. + */ + public abstract void onExpiration(); + + /** + * Process for completing an operation; This function needs to be defined + * in subclasses and will be called exactly once in forceComplete() + */ + public abstract void onComplete(); + + /** + * Try to complete the delayed operation by first checking if the operation + * can be completed by now. If yes execute the completion logic by calling + * forceComplete() and return true iff forceComplete returns true; otherwise return false + * <br/> + * This function needs to be defined in subclasses + */ + public abstract boolean tryComplete(); + + /** + * Thread-safe variant of tryComplete() and call extra function if first tryComplete returns false + * @param action else function to be executed after first tryComplete returns false + * @return result of tryComplete + */ + boolean safeTryCompleteOrElse(Action action) { + lock.lock(); + try { + if (tryComplete()) return true; + else { + action.run(); + // last completion check + return tryComplete(); + } + } finally { + lock.unlock(); + } + } + + /** + * Thread-safe variant of tryComplete() + */ + boolean safeTryComplete() { + lock.lock(); + try { + if (isCompleted()) return false; + else return tryComplete(); + } finally { + lock.unlock(); + } + } + + /** + * run() method defines a task that is executed on timeout + */ + @Override + public void run() { + if (forceComplete()) + onExpiration(); + } + + @FunctionalInterface + public interface Action { + void run(); Review Comment: nit: Should we call the method as `apply`, more synonymous to Java Function library? ```suggestion void apply(); ``` ########## server-common/src/test/java/org/apache/kafka/server/purgatory/DelayedOperationTest.java: ########## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class DelayedOperationTest { + + private final MockKey test1 = new MockKey("test1"); + private final MockKey test2 = new MockKey("test2"); + private final MockKey test3 = new MockKey("test3"); + private final Random random = new Random(); + private DelayedOperationPurgatory<DelayedOperation> purgatory; + private ScheduledExecutorService executorService; + + @BeforeEach + public void setUp() { + purgatory = new DelayedOperationPurgatory<>("mock", 0); + } + + @AfterEach + public void tearDown() throws Exception { + purgatory.shutdown(); + if (executorService != null) + executorService.shutdown(); + } + + static class MockKey implements DelayedOperationKey { + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MockKey mockKey = (MockKey) o; + return Objects.equals(key, mockKey.key); + } + + @Override + public int hashCode() { + return key != null ? key.hashCode() : 0; + } + + final String key; + + MockKey(String key) { + this.key = key; + } + + @Override + public String keyLabel() { + return key; + } + } + + @Test + public void testLockInTryCompleteElseWatch() { + DelayedOperation op = new DelayedOperation(100000L) { + @Override + public void onExpiration() {} + @Override + public void onComplete() {} + @Override + public boolean tryComplete() { + assertTrue(((ReentrantLock) lock).isHeldByCurrentThread()); + return false; + } + @Override + public boolean safeTryComplete() { + fail("tryCompleteElseWatch should not use safeTryComplete"); + return super.safeTryComplete(); + } + }; + purgatory.tryCompleteElseWatch(op, Collections.singletonList(new MockKey("key"))); + } + + DelayedOperation op(boolean shouldComplete) { + return new DelayedOperation(100000L) { + @Override + public void onExpiration() {} + + @Override + public void onComplete() {} + + @Override + public boolean tryComplete() { + assertTrue(((ReentrantLock) lock).isHeldByCurrentThread()); + return shouldComplete; + } + }; + } + + @Test + public void testSafeTryCompleteOrElse() { + final AtomicBoolean pass = new AtomicBoolean(); + assertFalse(op(false).safeTryCompleteOrElse(() -> pass.set(true))); + assertTrue(pass.get()); + assertTrue(op(true).safeTryCompleteOrElse(() -> fail("this method should NOT be executed"))); + } + + @Test + public void testRequestSatisfaction() { + MockDelayedOperation r1 = new MockDelayedOperation(100000L); + MockDelayedOperation r2 = new MockDelayedOperation(100000L); + assertEquals(0, purgatory.checkAndComplete(test1), "With no waiting requests, nothing should be satisfied"); + assertFalse(purgatory.tryCompleteElseWatch(r1, Collections.singletonList(new MockKey("test1"))), "r1 not satisfied and hence watched"); + assertEquals(0, purgatory.checkAndComplete(test1), "Still nothing satisfied"); + assertFalse(purgatory.tryCompleteElseWatch(r2, Collections.singletonList(new MockKey("test2"))), "r2 not satisfied and hence watched"); + assertEquals(0, purgatory.checkAndComplete(test2), "Still nothing satisfied"); + r1.completable = true; + assertEquals(1, purgatory.checkAndComplete(test1), "r1 satisfied"); + assertEquals(0, purgatory.checkAndComplete(test1), "Nothing satisfied"); + r2.completable = true; + assertEquals(1, purgatory.checkAndComplete(test2), "r2 satisfied"); + assertEquals(0, purgatory.checkAndComplete(test2), "Nothing satisfied"); + } + + @Test + public void testRequestExpiry() throws Exception { + long expiration = 20L; + long start = Time.SYSTEM.hiResClockMs(); + MockDelayedOperation r1 = new MockDelayedOperation(expiration); + MockDelayedOperation r2 = new MockDelayedOperation(200000L); + assertFalse(purgatory.tryCompleteElseWatch(r1, Collections.singletonList(test1)), "r1 not satisfied and hence watched"); + assertFalse(purgatory.tryCompleteElseWatch(r2, Collections.singletonList(test2)), "r2 not satisfied and hence watched"); + r1.awaitExpiration(); + long elapsed = Time.SYSTEM.hiResClockMs() - start; + assertTrue(r1.isCompleted(), "r1 completed due to expiration"); + assertFalse(r2.isCompleted(), "r2 hasn't completed"); + assertTrue(elapsed >= expiration, "Time for expiration " + elapsed + " should at least " + expiration); + } + + @Test + public void testRequestPurge() { + MockDelayedOperation r1 = new MockDelayedOperation(100000L); + MockDelayedOperation r2 = new MockDelayedOperation(100000L); + MockDelayedOperation r3 = new MockDelayedOperation(100000L); + purgatory.tryCompleteElseWatch(r1, Collections.singletonList(test1)); + purgatory.tryCompleteElseWatch(r2, Arrays.asList(test1, test2)); + purgatory.tryCompleteElseWatch(r3, Arrays.asList(test1, test2, test3)); + + assertEquals(3, purgatory.numDelayed(), "Purgatory should have 3 total delayed operations"); + assertEquals(6, purgatory.watched(), "Purgatory should have 6 watched elements"); + + // complete the operations, it should immediately be purged from the delayed operation + r2.completable = true; + r2.tryComplete(); + assertEquals(2, purgatory.numDelayed(), "Purgatory should have 2 total delayed operations instead of " + purgatory.numDelayed()); + + r3.completable = true; + r3.tryComplete(); + assertEquals(1, purgatory.numDelayed(), "Purgatory should have 1 total delayed operations instead of " + purgatory.numDelayed()); + + // checking a watch should purge the watch list + purgatory.checkAndComplete(test1); + assertEquals(4, purgatory.watched(), "Purgatory should have 4 watched elements instead of " + purgatory.watched()); + + purgatory.checkAndComplete(test2); + assertEquals(2, purgatory.watched(), "Purgatory should have 2 watched elements instead of " + purgatory.watched()); + + purgatory.checkAndComplete(test3); + assertEquals(1, purgatory.watched(), "Purgatory should have 1 watched elements instead of " + purgatory.watched()); + } + + @Test + public void shouldCancelForKeyReturningCancelledOperations() { + purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test1)); + purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test1)); + purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test2)); + + List<DelayedOperation> cancelledOperations = purgatory.cancelForKey(test1); + assertEquals(2, cancelledOperations.size()); + assertEquals(1, purgatory.numDelayed()); + assertEquals(1, purgatory.watched()); + } + + @Test + public void shouldReturnNilOperationsOnCancelForKeyWhenKeyDoesntExist() { + List<DelayedOperation> cancelledOperations = purgatory.cancelForKey(test1); + assertTrue(cancelledOperations.isEmpty()); + } + + /** + * Test `tryComplete` with multiple threads to verify that there are no timing windows + * when completion is not performed even if the thread that makes the operation completable + * may not be able to acquire the operation lock. Since it is difficult to test all scenarios, + * this test uses random delays with a large number of threads. + */ + @Test + public void testTryCompleteWithMultipleThreads() throws ExecutionException, InterruptedException { + executorService = Executors.newScheduledThreadPool(20); + int maxDelayMs = 10; + int completionAttempts = 20; + List<TestDelayOperation> ops = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + TestDelayOperation op = new TestDelayOperation(i, completionAttempts, maxDelayMs); + purgatory.tryCompleteElseWatch(op, Collections.singletonList(op.key)); + ops.add(op); + } + + List<Future<?>> futures = new ArrayList<>(); + for (int i = 1; i <= completionAttempts; i++) { + for (TestDelayOperation op : ops) { + futures.add(scheduleTryComplete(executorService, op, random.nextInt(maxDelayMs))); + } + } + for (Future<?> future : futures) { + future.get(); + } + ops.forEach(op -> assertTrue(op.isCompleted(), "Operation " + op.key.keyLabel() + " should have completed")); + } + + Future<?> scheduleTryComplete(ScheduledExecutorService executorService, TestDelayOperation op, long delayMs) { + return executorService.schedule(() -> { + if (op.completionAttemptsRemaining.decrementAndGet() == 0) { + op.completable = true; + } + purgatory.checkAndComplete(op.key); + }, delayMs, TimeUnit.MILLISECONDS); + } + + static class MockDelayedOperation extends DelayedOperation { + + private final Optional<Lock> responseLockOpt; + boolean completable = false; + + MockDelayedOperation(long delayMs) { + this(delayMs, Optional.empty()); + } + + MockDelayedOperation(long delayMs, Optional<Lock> responseLockOpt) { + super(delayMs); + this.responseLockOpt = responseLockOpt; + } + + @Override + public boolean tryComplete() { + if (completable) { + return forceComplete(); + } else { + return false; + } + } + + @Override + public void onExpiration() { } + + @Override + public void onComplete() { + responseLockOpt.ifPresent(lock -> { + if (!lock.tryLock()) + throw new IllegalStateException("Response callback lock could not be acquired in callback"); + }); + synchronized (this) { + notify(); + } + } + + void awaitExpiration() throws InterruptedException { + synchronized (this) { + wait(); + } + } + } + + class TestDelayOperation extends MockDelayedOperation { + + final MockKey key; + final AtomicInteger completionAttemptsRemaining; + final int maxDelayMs; Review Comment: private member variables as well? ########## server-common/src/test/java/org/apache/kafka/server/purgatory/DelayedOperationTest.java: ########## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class DelayedOperationTest { + + private final MockKey test1 = new MockKey("test1"); + private final MockKey test2 = new MockKey("test2"); + private final MockKey test3 = new MockKey("test3"); + private final Random random = new Random(); + private DelayedOperationPurgatory<DelayedOperation> purgatory; + private ScheduledExecutorService executorService; + + @BeforeEach + public void setUp() { + purgatory = new DelayedOperationPurgatory<>("mock", 0); + } + + @AfterEach + public void tearDown() throws Exception { + purgatory.shutdown(); + if (executorService != null) + executorService.shutdown(); + } + + static class MockKey implements DelayedOperationKey { + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MockKey mockKey = (MockKey) o; + return Objects.equals(key, mockKey.key); + } + + @Override + public int hashCode() { + return key != null ? key.hashCode() : 0; + } + + final String key; + + MockKey(String key) { + this.key = key; + } + + @Override + public String keyLabel() { + return key; + } + } + + @Test + public void testLockInTryCompleteElseWatch() { + DelayedOperation op = new DelayedOperation(100000L) { + @Override + public void onExpiration() {} + @Override + public void onComplete() {} + @Override + public boolean tryComplete() { + assertTrue(((ReentrantLock) lock).isHeldByCurrentThread()); + return false; + } + @Override + public boolean safeTryComplete() { + fail("tryCompleteElseWatch should not use safeTryComplete"); + return super.safeTryComplete(); + } + }; + purgatory.tryCompleteElseWatch(op, Collections.singletonList(new MockKey("key"))); + } + + DelayedOperation op(boolean shouldComplete) { + return new DelayedOperation(100000L) { + @Override + public void onExpiration() {} + + @Override + public void onComplete() {} + + @Override + public boolean tryComplete() { + assertTrue(((ReentrantLock) lock).isHeldByCurrentThread()); + return shouldComplete; + } + }; + } + + @Test + public void testSafeTryCompleteOrElse() { + final AtomicBoolean pass = new AtomicBoolean(); + assertFalse(op(false).safeTryCompleteOrElse(() -> pass.set(true))); + assertTrue(pass.get()); + assertTrue(op(true).safeTryCompleteOrElse(() -> fail("this method should NOT be executed"))); + } + + @Test + public void testRequestSatisfaction() { + MockDelayedOperation r1 = new MockDelayedOperation(100000L); + MockDelayedOperation r2 = new MockDelayedOperation(100000L); + assertEquals(0, purgatory.checkAndComplete(test1), "With no waiting requests, nothing should be satisfied"); + assertFalse(purgatory.tryCompleteElseWatch(r1, Collections.singletonList(new MockKey("test1"))), "r1 not satisfied and hence watched"); + assertEquals(0, purgatory.checkAndComplete(test1), "Still nothing satisfied"); + assertFalse(purgatory.tryCompleteElseWatch(r2, Collections.singletonList(new MockKey("test2"))), "r2 not satisfied and hence watched"); + assertEquals(0, purgatory.checkAndComplete(test2), "Still nothing satisfied"); + r1.completable = true; + assertEquals(1, purgatory.checkAndComplete(test1), "r1 satisfied"); + assertEquals(0, purgatory.checkAndComplete(test1), "Nothing satisfied"); + r2.completable = true; + assertEquals(1, purgatory.checkAndComplete(test2), "r2 satisfied"); + assertEquals(0, purgatory.checkAndComplete(test2), "Nothing satisfied"); + } + + @Test + public void testRequestExpiry() throws Exception { + long expiration = 20L; + long start = Time.SYSTEM.hiResClockMs(); + MockDelayedOperation r1 = new MockDelayedOperation(expiration); + MockDelayedOperation r2 = new MockDelayedOperation(200000L); + assertFalse(purgatory.tryCompleteElseWatch(r1, Collections.singletonList(test1)), "r1 not satisfied and hence watched"); + assertFalse(purgatory.tryCompleteElseWatch(r2, Collections.singletonList(test2)), "r2 not satisfied and hence watched"); + r1.awaitExpiration(); + long elapsed = Time.SYSTEM.hiResClockMs() - start; + assertTrue(r1.isCompleted(), "r1 completed due to expiration"); + assertFalse(r2.isCompleted(), "r2 hasn't completed"); + assertTrue(elapsed >= expiration, "Time for expiration " + elapsed + " should at least " + expiration); + } + + @Test + public void testRequestPurge() { + MockDelayedOperation r1 = new MockDelayedOperation(100000L); + MockDelayedOperation r2 = new MockDelayedOperation(100000L); + MockDelayedOperation r3 = new MockDelayedOperation(100000L); + purgatory.tryCompleteElseWatch(r1, Collections.singletonList(test1)); + purgatory.tryCompleteElseWatch(r2, Arrays.asList(test1, test2)); + purgatory.tryCompleteElseWatch(r3, Arrays.asList(test1, test2, test3)); + + assertEquals(3, purgatory.numDelayed(), "Purgatory should have 3 total delayed operations"); + assertEquals(6, purgatory.watched(), "Purgatory should have 6 watched elements"); + + // complete the operations, it should immediately be purged from the delayed operation + r2.completable = true; + r2.tryComplete(); + assertEquals(2, purgatory.numDelayed(), "Purgatory should have 2 total delayed operations instead of " + purgatory.numDelayed()); + + r3.completable = true; + r3.tryComplete(); + assertEquals(1, purgatory.numDelayed(), "Purgatory should have 1 total delayed operations instead of " + purgatory.numDelayed()); + + // checking a watch should purge the watch list + purgatory.checkAndComplete(test1); + assertEquals(4, purgatory.watched(), "Purgatory should have 4 watched elements instead of " + purgatory.watched()); + + purgatory.checkAndComplete(test2); + assertEquals(2, purgatory.watched(), "Purgatory should have 2 watched elements instead of " + purgatory.watched()); + + purgatory.checkAndComplete(test3); + assertEquals(1, purgatory.watched(), "Purgatory should have 1 watched elements instead of " + purgatory.watched()); + } + + @Test + public void shouldCancelForKeyReturningCancelledOperations() { + purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test1)); + purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test1)); + purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test2)); + + List<DelayedOperation> cancelledOperations = purgatory.cancelForKey(test1); + assertEquals(2, cancelledOperations.size()); + assertEquals(1, purgatory.numDelayed()); + assertEquals(1, purgatory.watched()); + } + + @Test + public void shouldReturnNilOperationsOnCancelForKeyWhenKeyDoesntExist() { + List<DelayedOperation> cancelledOperations = purgatory.cancelForKey(test1); + assertTrue(cancelledOperations.isEmpty()); + } + + /** + * Test `tryComplete` with multiple threads to verify that there are no timing windows + * when completion is not performed even if the thread that makes the operation completable + * may not be able to acquire the operation lock. Since it is difficult to test all scenarios, + * this test uses random delays with a large number of threads. + */ + @Test + public void testTryCompleteWithMultipleThreads() throws ExecutionException, InterruptedException { + executorService = Executors.newScheduledThreadPool(20); + int maxDelayMs = 10; + int completionAttempts = 20; + List<TestDelayOperation> ops = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + TestDelayOperation op = new TestDelayOperation(i, completionAttempts, maxDelayMs); + purgatory.tryCompleteElseWatch(op, Collections.singletonList(op.key)); + ops.add(op); + } + + List<Future<?>> futures = new ArrayList<>(); + for (int i = 1; i <= completionAttempts; i++) { + for (TestDelayOperation op : ops) { + futures.add(scheduleTryComplete(executorService, op, random.nextInt(maxDelayMs))); + } + } + for (Future<?> future : futures) { + future.get(); + } + ops.forEach(op -> assertTrue(op.isCompleted(), "Operation " + op.key.keyLabel() + " should have completed")); + } + + Future<?> scheduleTryComplete(ScheduledExecutorService executorService, TestDelayOperation op, long delayMs) { + return executorService.schedule(() -> { + if (op.completionAttemptsRemaining.decrementAndGet() == 0) { + op.completable = true; + } + purgatory.checkAndComplete(op.key); + }, delayMs, TimeUnit.MILLISECONDS); + } + + static class MockDelayedOperation extends DelayedOperation { + + private final Optional<Lock> responseLockOpt; + boolean completable = false; + + MockDelayedOperation(long delayMs) { + this(delayMs, Optional.empty()); + } + + MockDelayedOperation(long delayMs, Optional<Lock> responseLockOpt) { + super(delayMs); + this.responseLockOpt = responseLockOpt; + } + + @Override + public boolean tryComplete() { + if (completable) { + return forceComplete(); + } else { + return false; + } + } + + @Override + public void onExpiration() { } + + @Override + public void onComplete() { + responseLockOpt.ifPresent(lock -> { + if (!lock.tryLock()) + throw new IllegalStateException("Response callback lock could not be acquired in callback"); + }); + synchronized (this) { + notify(); + } + } + + void awaitExpiration() throws InterruptedException { + synchronized (this) { + wait(); + } + } + } + + class TestDelayOperation extends MockDelayedOperation { Review Comment: Can this be private as well? ########## server-common/src/test/java/org/apache/kafka/server/purgatory/DelayedOperationTest.java: ########## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class DelayedOperationTest { + + private final MockKey test1 = new MockKey("test1"); + private final MockKey test2 = new MockKey("test2"); + private final MockKey test3 = new MockKey("test3"); + private final Random random = new Random(); + private DelayedOperationPurgatory<DelayedOperation> purgatory; + private ScheduledExecutorService executorService; + + @BeforeEach + public void setUp() { + purgatory = new DelayedOperationPurgatory<>("mock", 0); + } + + @AfterEach + public void tearDown() throws Exception { + purgatory.shutdown(); + if (executorService != null) + executorService.shutdown(); + } + + static class MockKey implements DelayedOperationKey { Review Comment: ```suggestion private static class MockKey implements DelayedOperationKey { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
