junrao commented on code in PR #17636: URL: https://github.com/apache/kafka/pull/17636#discussion_r1828547614
########## server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.server.util.timer.SystemTimer; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public class DelayedOperationPurgatory<T extends DelayedOperation> { + + private static final Logger LOG = LoggerFactory.getLogger(DelayedOperationPurgatory.class); + private static final int SHARDS = 512; // Shard the watcher list to reduce lock contention + + private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "DelayedOperationPurgatory"); + private final Map<String, String> metricsTags; + private final List<WatcherList> watcherLists; + // the number of estimated total operations in the purgatory + private final AtomicInteger estimatedTotalOperations = new AtomicInteger(0); + /* background thread expiring operations that have timed out */ + private final ExpiredOperationReaper expirationReaper = new ExpiredOperationReaper(); + private final String purgatoryName; + private final Timer timeoutTimer; + private final int brokerId; + private final int purgeInterval; + private final boolean reaperEnabled; + private final boolean timerEnabled; + + public DelayedOperationPurgatory(String purgatoryName, Timer timer, int brokerId, boolean reaperEnabled) { + this(purgatoryName, timer, brokerId, 1000, reaperEnabled, true); + } + + public DelayedOperationPurgatory(String purgatoryName, int brokerId) { + this(purgatoryName, brokerId, 1000); + } + + public DelayedOperationPurgatory(String purgatoryName, int brokerId, int purgeInterval) { + this(purgatoryName, new SystemTimer(purgatoryName), brokerId, purgeInterval, true, true); + } + + /** + * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. + */ + @SuppressWarnings("this-escape") + public DelayedOperationPurgatory(String purgatoryName, + Timer timeoutTimer, + int brokerId, + int purgeInterval, + boolean reaperEnabled, + boolean timerEnabled) { + this.purgatoryName = purgatoryName; + this.timeoutTimer = timeoutTimer; + this.brokerId = brokerId; + this.purgeInterval = purgeInterval; + this.reaperEnabled = reaperEnabled; + this.timerEnabled = timerEnabled; + + watcherLists = new ArrayList<>(SHARDS); + for (int i = 0; i < SHARDS; i++) { + watcherLists.add(new WatcherList()); + } + metricsTags = Collections.singletonMap("delayedOperation", purgatoryName); + metricsGroup.newGauge("PurgatorySize", this::watched, metricsTags); + metricsGroup.newGauge("NumDelayedOperations", this::numDelayed, metricsTags); + if (reaperEnabled) { + expirationReaper.start(); + } + } + + private WatcherList watcherList(DelayedOperationKey key) { + return watcherLists.get(Math.abs(key.hashCode() % watcherLists.size())); + } + + /** + * Check if the operation can be completed, if not watch it based on the given watch keys + * <br/> + * Note that a delayed operation can be watched on multiple keys. It is possible that + * an operation is completed after it has been added to the watch list for some, but + * not all the keys. In this case, the operation is considered completed and won't + * be added to the watch list of the remaining keys. The expiration reaper thread will + * remove this operation from any watcher list in which the operation exists. + * + * @param operation the delayed operation to be checked + * @param watchKeys keys for bookkeeping the operation + * @return true iff the delayed operations can be completed by the caller + */ + public <K extends DelayedOperationKey> boolean tryCompleteElseWatch(T operation, List<K> watchKeys) { + if (watchKeys.isEmpty()) { + throw new IllegalArgumentException("The watch key list can't be empty"); + } + + // The cost of tryComplete() is typically proportional to the number of keys. Calling tryComplete() for each key is + // going to be expensive if there are many keys. Instead, we do the check in the following way through safeTryCompleteOrElse(). + // If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again. At + // this time, if the operation is still not completed, we are guaranteed that it won't miss any future triggering + // event since the operation is already on the watcher list for all keys. + // + // ==============[story about lock]============== + // Through safeTryCompleteOrElse(), we hold the operation's lock while adding the operation to watch list and doing + // the tryComplete() check. This is to avoid a potential deadlock between the callers to tryCompleteElseWatch() and + // checkAndComplete(). For example, the following deadlock can happen if the lock is only held for the final tryComplete() + // 1) thread_a holds readlock of stateLock from TransactionStateManager + // 2) thread_a is executing tryCompleteElseWatch() + // 3) thread_a adds op to watch list + // 4) thread_b requires writelock of stateLock from TransactionStateManager (blocked by thread_a) + // 5) thread_c calls checkAndComplete() and holds lock of op + // 6) thread_c is waiting readlock of stateLock to complete op (blocked by thread_b) + // 7) thread_a is waiting lock of op to call the final tryComplete() (blocked by thread_c) + // + // Note that even with the current approach, deadlocks could still be introduced. For example, + // 1) thread_a calls tryCompleteElseWatch() and gets lock of op + // 2) thread_a adds op to watch list + // 3) thread_a calls op#tryComplete and tries to require lock_b + // 4) thread_b holds lock_b and calls checkAndComplete() + // 5) thread_b sees op from watch list + // 6) thread_b needs lock of op + // To avoid the above scenario, we recommend DelayedOperationPurgatory.checkAndComplete() be called without holding + // any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously, + // holding an exclusive lock to make the call is often unnecessary. + if (operation.safeTryCompleteOrElse(() -> { + watchKeys.forEach(key -> watchForOperation(key, operation)); + if (!watchKeys.isEmpty()) estimatedTotalOperations.incrementAndGet(); + })) { + return true; + } + + + // if it cannot be completed by now and hence is watched, add to the timeout queue also + if (!operation.isCompleted()) { + if (timerEnabled) + timeoutTimer.add(operation); + if (operation.isCompleted()) { + // cancel the timer task + operation.cancel(); + } + } + return false; + } + + /** + * Check if some delayed operations can be completed with the given watch key, + * and if yes complete them. + * + * @return the number of completed operations during this process + */ + public <K extends DelayedOperationKey> int checkAndComplete(K key) { + WatcherList wl = watcherList(key); + Watchers watchers; + wl.watchersLock.lock(); + try { + watchers = wl.watchersByKey.get(key); + } finally { + wl.watchersLock.unlock(); + } + int numCompleted = watchers == null ? 0 : watchers.tryCompleteWatched(); + + if (numCompleted > 0) { + LOG.debug("Request key {} unblocked {} {} operations", key, numCompleted, purgatoryName); + } + return numCompleted; + } + + /** + * Return the total size of watch lists the purgatory. Since an operation may be watched + * on multiple lists, and some of its watched entries may still be in the watch lists + * even when it has been completed, this number may be larger than the number of real operations watched + */ + public int watched() { + int sum = 0; + for (WatcherList watcherList : watcherLists) { + sum += watcherList.allWatchers().stream().map(Watchers::countWatched).mapToInt(c -> c).sum(); Review Comment: This could be a bit simpler. `sum += watcherList.allWatchers().stream().mapToInt(Watchers::countWatched).sum();` ########## server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.server.util.timer.TimerTask; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * An operation whose processing needs to be delayed for at most the given delayMs. For example + * a delayed produce operation could be waiting for specified number of acks; or + * a delayed fetch operation could be waiting for a given number of bytes to accumulate. + * <br/> + * The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once. + * Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either + * forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed, + * or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls + * forceComplete(). + * <br/> + * A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete(). + * <br/> + * Noted that if you add a future delayed operation that calls ReplicaManager.appendRecords() in onComplete() + * like DelayedJoin, you must be aware that this operation's onExpiration() needs to call actionQueue.tryCompleteAction(). + */ +public abstract class DelayedOperation extends TimerTask { + + private final AtomicBoolean completed = new AtomicBoolean(false); + // Visible for testing + final Lock lock; + + public DelayedOperation(long delayMs, Optional<Lock> lockOpt) { Review Comment: It seems that the only caller to this constructor is the one in ReplicaManager. Could you change ReplicaManager to use one of the other constructors depending on lockOpt? Then, we could remove this constructor. ########## core/src/main/scala/kafka/server/DelayedFuture.scala: ########## @@ -70,19 +73,22 @@ class DelayedFuture[T](timeoutMs: Long, } class DelayedFuturePurgatory(purgatoryName: String, brokerId: Int) { - private val purgatory = DelayedOperationPurgatory[DelayedFuture[_]](purgatoryName, brokerId) + private val purgatory = new DelayedOperationPurgatory[DelayedFuture[_]](purgatoryName, brokerId) private val executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable](), new ThreadFactory { override def newThread(r: Runnable): Thread = new KafkaThread(s"DelayedExecutor-$purgatoryName", r, true) }) - private val purgatoryKey = new Object + private val purgatoryKey = new DelayedOperationKey() { + + override def keyLabel(): String = new Object().toString Review Comment: It's better to provide a readable label. Perhaps "delayed-future-key"? ########## server-common/src/test/java/org/apache/kafka/server/purgatory/DelayedOperationTest.java: ########## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class DelayedOperationTest { + + private final MockKey test1 = new MockKey("test1"); + private final MockKey test2 = new MockKey("test2"); + private final MockKey test3 = new MockKey("test3"); + private final Random random = new Random(); + private DelayedOperationPurgatory<DelayedOperation> purgatory; + private ScheduledExecutorService executorService; + + @BeforeEach + public void setUp() { + purgatory = new DelayedOperationPurgatory<>("mock", 0); + } + + @AfterEach + public void tearDown() throws Exception { + purgatory.shutdown(); + if (executorService != null) + executorService.shutdown(); + } + + static class MockKey implements DelayedOperationKey { + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MockKey mockKey = (MockKey) o; + return Objects.equals(key, mockKey.key); + } + + @Override + public int hashCode() { + return key != null ? key.hashCode() : 0; + } + + final String key; + + MockKey(String key) { + this.key = key; + } + + @Override + public String keyLabel() { + return key; + } + } + + @Test + public void testLockInTryCompleteElseWatch() { + DelayedOperation op = new DelayedOperation(100000L) { + @Override + public void onExpiration() {} + @Override + public void onComplete() {} + @Override + public boolean tryComplete() { + assertTrue(((ReentrantLock) lock).isHeldByCurrentThread()); + return false; + } + @Override + public boolean safeTryComplete() { + fail("tryCompleteElseWatch should not use safeTryComplete"); + return super.safeTryComplete(); + } + }; + purgatory.tryCompleteElseWatch(op, Collections.singletonList(new MockKey("key"))); + } + + DelayedOperation op(boolean shouldComplete) { Review Comment: Could this be private? ########## server-common/src/test/java/org/apache/kafka/server/purgatory/DelayedOperationTest.java: ########## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.common.utils.Time; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class DelayedOperationTest { + + private final MockKey test1 = new MockKey("test1"); + private final MockKey test2 = new MockKey("test2"); + private final MockKey test3 = new MockKey("test3"); + private final Random random = new Random(); + private DelayedOperationPurgatory<DelayedOperation> purgatory; + private ScheduledExecutorService executorService; + + @BeforeEach + public void setUp() { + purgatory = new DelayedOperationPurgatory<>("mock", 0); + } + + @AfterEach + public void tearDown() throws Exception { + purgatory.shutdown(); + if (executorService != null) + executorService.shutdown(); + } + + static class MockKey implements DelayedOperationKey { + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MockKey mockKey = (MockKey) o; + return Objects.equals(key, mockKey.key); + } + + @Override + public int hashCode() { + return key != null ? key.hashCode() : 0; + } + + final String key; + + MockKey(String key) { + this.key = key; + } + + @Override + public String keyLabel() { + return key; + } + } + + @Test + public void testLockInTryCompleteElseWatch() { + DelayedOperation op = new DelayedOperation(100000L) { + @Override + public void onExpiration() {} + @Override + public void onComplete() {} + @Override + public boolean tryComplete() { + assertTrue(((ReentrantLock) lock).isHeldByCurrentThread()); + return false; + } + @Override + public boolean safeTryComplete() { + fail("tryCompleteElseWatch should not use safeTryComplete"); + return super.safeTryComplete(); + } + }; + purgatory.tryCompleteElseWatch(op, Collections.singletonList(new MockKey("key"))); + } + + DelayedOperation op(boolean shouldComplete) { + return new DelayedOperation(100000L) { + @Override + public void onExpiration() {} + + @Override + public void onComplete() {} + + @Override + public boolean tryComplete() { + assertTrue(((ReentrantLock) lock).isHeldByCurrentThread()); + return shouldComplete; + } + }; + } + + @Test + public void testSafeTryCompleteOrElse() { + final AtomicBoolean pass = new AtomicBoolean(); + assertFalse(op(false).safeTryCompleteOrElse(() -> pass.set(true))); + assertTrue(pass.get()); + assertTrue(op(true).safeTryCompleteOrElse(() -> fail("this method should NOT be executed"))); + } + + @Test + public void testRequestSatisfaction() { + MockDelayedOperation r1 = new MockDelayedOperation(100000L); + MockDelayedOperation r2 = new MockDelayedOperation(100000L); + assertEquals(0, purgatory.checkAndComplete(test1), "With no waiting requests, nothing should be satisfied"); + assertFalse(purgatory.tryCompleteElseWatch(r1, Collections.singletonList(new MockKey("test1"))), "r1 not satisfied and hence watched"); + assertEquals(0, purgatory.checkAndComplete(test1), "Still nothing satisfied"); + assertFalse(purgatory.tryCompleteElseWatch(r2, Collections.singletonList(new MockKey("test2"))), "r2 not satisfied and hence watched"); + assertEquals(0, purgatory.checkAndComplete(test2), "Still nothing satisfied"); + r1.completable = true; + assertEquals(1, purgatory.checkAndComplete(test1), "r1 satisfied"); + assertEquals(0, purgatory.checkAndComplete(test1), "Nothing satisfied"); + r2.completable = true; + assertEquals(1, purgatory.checkAndComplete(test2), "r2 satisfied"); + assertEquals(0, purgatory.checkAndComplete(test2), "Nothing satisfied"); + } + + @Test + public void testRequestExpiry() throws Exception { + long expiration = 20L; + long start = Time.SYSTEM.hiResClockMs(); + MockDelayedOperation r1 = new MockDelayedOperation(expiration); + MockDelayedOperation r2 = new MockDelayedOperation(200000L); + assertFalse(purgatory.tryCompleteElseWatch(r1, Collections.singletonList(test1)), "r1 not satisfied and hence watched"); + assertFalse(purgatory.tryCompleteElseWatch(r2, Collections.singletonList(test2)), "r2 not satisfied and hence watched"); + r1.awaitExpiration(); + long elapsed = Time.SYSTEM.hiResClockMs() - start; + assertTrue(r1.isCompleted(), "r1 completed due to expiration"); + assertFalse(r2.isCompleted(), "r2 hasn't completed"); + assertTrue(elapsed >= expiration, "Time for expiration " + elapsed + " should at least " + expiration); + } + + @Test + public void testRequestPurge() { + MockDelayedOperation r1 = new MockDelayedOperation(100000L); + MockDelayedOperation r2 = new MockDelayedOperation(100000L); + MockDelayedOperation r3 = new MockDelayedOperation(100000L); + purgatory.tryCompleteElseWatch(r1, Collections.singletonList(test1)); + purgatory.tryCompleteElseWatch(r2, Arrays.asList(test1, test2)); + purgatory.tryCompleteElseWatch(r3, Arrays.asList(test1, test2, test3)); + + assertEquals(3, purgatory.numDelayed(), "Purgatory should have 3 total delayed operations"); + assertEquals(6, purgatory.watched(), "Purgatory should have 6 watched elements"); + + // complete the operations, it should immediately be purged from the delayed operation + r2.completable = true; + r2.tryComplete(); + assertEquals(2, purgatory.numDelayed(), "Purgatory should have 2 total delayed operations instead of " + purgatory.numDelayed()); + + r3.completable = true; + r3.tryComplete(); + assertEquals(1, purgatory.numDelayed(), "Purgatory should have 1 total delayed operations instead of " + purgatory.numDelayed()); + + // checking a watch should purge the watch list + purgatory.checkAndComplete(test1); + assertEquals(4, purgatory.watched(), "Purgatory should have 4 watched elements instead of " + purgatory.watched()); + + purgatory.checkAndComplete(test2); + assertEquals(2, purgatory.watched(), "Purgatory should have 2 watched elements instead of " + purgatory.watched()); + + purgatory.checkAndComplete(test3); + assertEquals(1, purgatory.watched(), "Purgatory should have 1 watched elements instead of " + purgatory.watched()); + } + + @Test + public void shouldCancelForKeyReturningCancelledOperations() { + purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test1)); + purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test1)); + purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(test2)); + + List<DelayedOperation> cancelledOperations = purgatory.cancelForKey(test1); + assertEquals(2, cancelledOperations.size()); + assertEquals(1, purgatory.numDelayed()); + assertEquals(1, purgatory.watched()); + } + + @Test + public void shouldReturnNilOperationsOnCancelForKeyWhenKeyDoesntExist() { + List<DelayedOperation> cancelledOperations = purgatory.cancelForKey(test1); + assertTrue(cancelledOperations.isEmpty()); + } + + /** + * Test `tryComplete` with multiple threads to verify that there are no timing windows + * when completion is not performed even if the thread that makes the operation completable + * may not be able to acquire the operation lock. Since it is difficult to test all scenarios, + * this test uses random delays with a large number of threads. + */ + @Test + public void testTryCompleteWithMultipleThreads() throws ExecutionException, InterruptedException { + executorService = Executors.newScheduledThreadPool(20); + int maxDelayMs = 10; + int completionAttempts = 20; + List<TestDelayOperation> ops = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + TestDelayOperation op = new TestDelayOperation(i, completionAttempts, maxDelayMs); + purgatory.tryCompleteElseWatch(op, Collections.singletonList(op.key)); + ops.add(op); + } + + List<Future<?>> futures = new ArrayList<>(); + for (int i = 1; i <= completionAttempts; i++) { + for (TestDelayOperation op : ops) { + futures.add(scheduleTryComplete(executorService, op, random.nextInt(maxDelayMs))); + } + } + for (Future<?> future : futures) { + future.get(); + } + ops.forEach(op -> assertTrue(op.isCompleted(), "Operation " + op.key.keyLabel() + " should have completed")); + } + + Future<?> scheduleTryComplete(ScheduledExecutorService executorService, TestDelayOperation op, long delayMs) { Review Comment: Could this be private? ########## server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.server.util.timer.SystemTimer; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public class DelayedOperationPurgatory<T extends DelayedOperation> { + + private static final Logger LOG = LoggerFactory.getLogger(DelayedOperationPurgatory.class); + private static final int SHARDS = 512; // Shard the watcher list to reduce lock contention + + private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "DelayedOperationPurgatory"); + private final Map<String, String> metricsTags; + private final List<WatcherList> watcherLists; + // the number of estimated total operations in the purgatory + private final AtomicInteger estimatedTotalOperations = new AtomicInteger(0); + /* background thread expiring operations that have timed out */ + private final ExpiredOperationReaper expirationReaper = new ExpiredOperationReaper(); + private final String purgatoryName; + private final Timer timeoutTimer; + private final int brokerId; + private final int purgeInterval; + private final boolean reaperEnabled; + private final boolean timerEnabled; + + public DelayedOperationPurgatory(String purgatoryName, Timer timer, int brokerId, boolean reaperEnabled) { + this(purgatoryName, timer, brokerId, 1000, reaperEnabled, true); + } + + public DelayedOperationPurgatory(String purgatoryName, int brokerId) { + this(purgatoryName, brokerId, 1000); + } + + public DelayedOperationPurgatory(String purgatoryName, int brokerId, int purgeInterval) { + this(purgatoryName, new SystemTimer(purgatoryName), brokerId, purgeInterval, true, true); + } + + /** + * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. + */ + @SuppressWarnings("this-escape") + public DelayedOperationPurgatory(String purgatoryName, + Timer timeoutTimer, + int brokerId, + int purgeInterval, + boolean reaperEnabled, + boolean timerEnabled) { + this.purgatoryName = purgatoryName; + this.timeoutTimer = timeoutTimer; + this.brokerId = brokerId; + this.purgeInterval = purgeInterval; + this.reaperEnabled = reaperEnabled; + this.timerEnabled = timerEnabled; + + watcherLists = new ArrayList<>(SHARDS); + for (int i = 0; i < SHARDS; i++) { + watcherLists.add(new WatcherList()); + } + metricsTags = Collections.singletonMap("delayedOperation", purgatoryName); + metricsGroup.newGauge("PurgatorySize", this::watched, metricsTags); + metricsGroup.newGauge("NumDelayedOperations", this::numDelayed, metricsTags); + if (reaperEnabled) { + expirationReaper.start(); + } + } + + private WatcherList watcherList(DelayedOperationKey key) { + return watcherLists.get(Math.abs(key.hashCode() % watcherLists.size())); + } + + /** + * Check if the operation can be completed, if not watch it based on the given watch keys + * <br/> + * Note that a delayed operation can be watched on multiple keys. It is possible that + * an operation is completed after it has been added to the watch list for some, but + * not all the keys. In this case, the operation is considered completed and won't + * be added to the watch list of the remaining keys. The expiration reaper thread will + * remove this operation from any watcher list in which the operation exists. + * + * @param operation the delayed operation to be checked + * @param watchKeys keys for bookkeeping the operation + * @return true iff the delayed operations can be completed by the caller + */ + public <K extends DelayedOperationKey> boolean tryCompleteElseWatch(T operation, List<K> watchKeys) { + if (watchKeys.isEmpty()) { + throw new IllegalArgumentException("The watch key list can't be empty"); + } + + // The cost of tryComplete() is typically proportional to the number of keys. Calling tryComplete() for each key is + // going to be expensive if there are many keys. Instead, we do the check in the following way through safeTryCompleteOrElse(). + // If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again. At + // this time, if the operation is still not completed, we are guaranteed that it won't miss any future triggering + // event since the operation is already on the watcher list for all keys. + // + // ==============[story about lock]============== + // Through safeTryCompleteOrElse(), we hold the operation's lock while adding the operation to watch list and doing + // the tryComplete() check. This is to avoid a potential deadlock between the callers to tryCompleteElseWatch() and + // checkAndComplete(). For example, the following deadlock can happen if the lock is only held for the final tryComplete() + // 1) thread_a holds readlock of stateLock from TransactionStateManager + // 2) thread_a is executing tryCompleteElseWatch() + // 3) thread_a adds op to watch list + // 4) thread_b requires writelock of stateLock from TransactionStateManager (blocked by thread_a) + // 5) thread_c calls checkAndComplete() and holds lock of op + // 6) thread_c is waiting readlock of stateLock to complete op (blocked by thread_b) + // 7) thread_a is waiting lock of op to call the final tryComplete() (blocked by thread_c) + // + // Note that even with the current approach, deadlocks could still be introduced. For example, + // 1) thread_a calls tryCompleteElseWatch() and gets lock of op + // 2) thread_a adds op to watch list + // 3) thread_a calls op#tryComplete and tries to require lock_b + // 4) thread_b holds lock_b and calls checkAndComplete() + // 5) thread_b sees op from watch list + // 6) thread_b needs lock of op + // To avoid the above scenario, we recommend DelayedOperationPurgatory.checkAndComplete() be called without holding + // any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously, + // holding an exclusive lock to make the call is often unnecessary. + if (operation.safeTryCompleteOrElse(() -> { + watchKeys.forEach(key -> watchForOperation(key, operation)); + if (!watchKeys.isEmpty()) estimatedTotalOperations.incrementAndGet(); Review Comment: Split into two lines? ########## core/src/main/scala/kafka/server/TopicKey.scala: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import org.apache.kafka.server.purgatory.DelayedOperationKey + +/* used by delayed-topic operations */ +case class TopicKey(topic: String) extends DelayedOperationKey { Review Comment: Could we write it in java? ########## server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.server.util.timer.SystemTimer; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public class DelayedOperationPurgatory<T extends DelayedOperation> { + + private static final Logger LOG = LoggerFactory.getLogger(DelayedOperationPurgatory.class); + private static final int SHARDS = 512; // Shard the watcher list to reduce lock contention + + private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "DelayedOperationPurgatory"); + private final Map<String, String> metricsTags; + private final List<WatcherList> watcherLists; + // the number of estimated total operations in the purgatory + private final AtomicInteger estimatedTotalOperations = new AtomicInteger(0); + /* background thread expiring operations that have timed out */ + private final ExpiredOperationReaper expirationReaper = new ExpiredOperationReaper(); + private final String purgatoryName; + private final Timer timeoutTimer; + private final int brokerId; + private final int purgeInterval; + private final boolean reaperEnabled; + private final boolean timerEnabled; + + public DelayedOperationPurgatory(String purgatoryName, Timer timer, int brokerId, boolean reaperEnabled) { + this(purgatoryName, timer, brokerId, 1000, reaperEnabled, true); + } + + public DelayedOperationPurgatory(String purgatoryName, int brokerId) { + this(purgatoryName, brokerId, 1000); + } + + public DelayedOperationPurgatory(String purgatoryName, int brokerId, int purgeInterval) { + this(purgatoryName, new SystemTimer(purgatoryName), brokerId, purgeInterval, true, true); + } + + /** + * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. + */ + @SuppressWarnings("this-escape") + public DelayedOperationPurgatory(String purgatoryName, + Timer timeoutTimer, + int brokerId, + int purgeInterval, + boolean reaperEnabled, + boolean timerEnabled) { + this.purgatoryName = purgatoryName; + this.timeoutTimer = timeoutTimer; + this.brokerId = brokerId; + this.purgeInterval = purgeInterval; + this.reaperEnabled = reaperEnabled; + this.timerEnabled = timerEnabled; + + watcherLists = new ArrayList<>(SHARDS); + for (int i = 0; i < SHARDS; i++) { + watcherLists.add(new WatcherList()); + } + metricsTags = Collections.singletonMap("delayedOperation", purgatoryName); + metricsGroup.newGauge("PurgatorySize", this::watched, metricsTags); + metricsGroup.newGauge("NumDelayedOperations", this::numDelayed, metricsTags); + if (reaperEnabled) { + expirationReaper.start(); + } + } + + private WatcherList watcherList(DelayedOperationKey key) { + return watcherLists.get(Math.abs(key.hashCode() % watcherLists.size())); + } + + /** + * Check if the operation can be completed, if not watch it based on the given watch keys + * <br/> + * Note that a delayed operation can be watched on multiple keys. It is possible that + * an operation is completed after it has been added to the watch list for some, but + * not all the keys. In this case, the operation is considered completed and won't + * be added to the watch list of the remaining keys. The expiration reaper thread will + * remove this operation from any watcher list in which the operation exists. + * + * @param operation the delayed operation to be checked + * @param watchKeys keys for bookkeeping the operation + * @return true iff the delayed operations can be completed by the caller + */ + public <K extends DelayedOperationKey> boolean tryCompleteElseWatch(T operation, List<K> watchKeys) { + if (watchKeys.isEmpty()) { + throw new IllegalArgumentException("The watch key list can't be empty"); + } + + // The cost of tryComplete() is typically proportional to the number of keys. Calling tryComplete() for each key is + // going to be expensive if there are many keys. Instead, we do the check in the following way through safeTryCompleteOrElse(). + // If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again. At + // this time, if the operation is still not completed, we are guaranteed that it won't miss any future triggering + // event since the operation is already on the watcher list for all keys. + // + // ==============[story about lock]============== + // Through safeTryCompleteOrElse(), we hold the operation's lock while adding the operation to watch list and doing + // the tryComplete() check. This is to avoid a potential deadlock between the callers to tryCompleteElseWatch() and + // checkAndComplete(). For example, the following deadlock can happen if the lock is only held for the final tryComplete() + // 1) thread_a holds readlock of stateLock from TransactionStateManager + // 2) thread_a is executing tryCompleteElseWatch() + // 3) thread_a adds op to watch list + // 4) thread_b requires writelock of stateLock from TransactionStateManager (blocked by thread_a) + // 5) thread_c calls checkAndComplete() and holds lock of op + // 6) thread_c is waiting readlock of stateLock to complete op (blocked by thread_b) + // 7) thread_a is waiting lock of op to call the final tryComplete() (blocked by thread_c) + // + // Note that even with the current approach, deadlocks could still be introduced. For example, + // 1) thread_a calls tryCompleteElseWatch() and gets lock of op + // 2) thread_a adds op to watch list + // 3) thread_a calls op#tryComplete and tries to require lock_b + // 4) thread_b holds lock_b and calls checkAndComplete() + // 5) thread_b sees op from watch list + // 6) thread_b needs lock of op + // To avoid the above scenario, we recommend DelayedOperationPurgatory.checkAndComplete() be called without holding + // any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously, + // holding an exclusive lock to make the call is often unnecessary. + if (operation.safeTryCompleteOrElse(() -> { + watchKeys.forEach(key -> watchForOperation(key, operation)); + if (!watchKeys.isEmpty()) estimatedTotalOperations.incrementAndGet(); + })) { + return true; + } + + + // if it cannot be completed by now and hence is watched, add to the timeout queue also + if (!operation.isCompleted()) { + if (timerEnabled) + timeoutTimer.add(operation); + if (operation.isCompleted()) { + // cancel the timer task + operation.cancel(); + } + } + return false; + } + + /** + * Check if some delayed operations can be completed with the given watch key, + * and if yes complete them. + * + * @return the number of completed operations during this process + */ + public <K extends DelayedOperationKey> int checkAndComplete(K key) { + WatcherList wl = watcherList(key); + Watchers watchers; + wl.watchersLock.lock(); + try { + watchers = wl.watchersByKey.get(key); + } finally { + wl.watchersLock.unlock(); + } + int numCompleted = watchers == null ? 0 : watchers.tryCompleteWatched(); + + if (numCompleted > 0) { + LOG.debug("Request key {} unblocked {} {} operations", key, numCompleted, purgatoryName); + } + return numCompleted; + } + + /** + * Return the total size of watch lists the purgatory. Since an operation may be watched + * on multiple lists, and some of its watched entries may still be in the watch lists + * even when it has been completed, this number may be larger than the number of real operations watched + */ + public int watched() { + int sum = 0; + for (WatcherList watcherList : watcherLists) { + sum += watcherList.allWatchers().stream().map(Watchers::countWatched).mapToInt(c -> c).sum(); + } + return sum; + } + + /** + * Return the number of delayed operations in the expiry queue + */ + public int numDelayed() { + return timeoutTimer.size(); + } + + /** + * Cancel watching on any delayed operations for the given key. Note the operation will not be completed + */ + public List<T> cancelForKey(DelayedOperationKey key) { + WatcherList wl = watcherList(key); + wl.watchersLock.lock(); + try { + Watchers watchers = wl.watchersByKey.remove(key); + if (watchers != null) + return watchers.cancel(); + else + return Collections.emptyList(); + } finally { + wl.watchersLock.unlock(); + } + } + + /* + * Return the watch list of the given key, note that we need to + * grab the removeWatchersLock to avoid the operation being added to a removed watcher list Review Comment: This is an existing issue. removeWatchersLock no longer exists. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1014,12 +1015,12 @@ class ReplicaManager(val config: KafkaConfig, val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock) // create a list of (topic, partition) pairs to use as keys for this delayed produce operation - val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq + val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq Review Comment: Here we continue to use `toSeq`, but in some other places, we use `toList`. Could we be more consistent? ########## server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.server.util.timer.SystemTimer; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.server.util.timer.TimerTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public class DelayedOperationPurgatory<T extends DelayedOperation> { + + private static final Logger LOG = LoggerFactory.getLogger(DelayedOperationPurgatory.class); + private static final int SHARDS = 512; // Shard the watcher list to reduce lock contention + + private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "DelayedOperationPurgatory"); + private final Map<String, String> metricsTags; + private final List<WatcherList> watcherLists; + // the number of estimated total operations in the purgatory + private final AtomicInteger estimatedTotalOperations = new AtomicInteger(0); + /* background thread expiring operations that have timed out */ + private final ExpiredOperationReaper expirationReaper = new ExpiredOperationReaper(); + private final String purgatoryName; + private final Timer timeoutTimer; + private final int brokerId; + private final int purgeInterval; + private final boolean reaperEnabled; + private final boolean timerEnabled; + + public DelayedOperationPurgatory(String purgatoryName, Timer timer, int brokerId, boolean reaperEnabled) { + this(purgatoryName, timer, brokerId, 1000, reaperEnabled, true); + } + + public DelayedOperationPurgatory(String purgatoryName, int brokerId) { + this(purgatoryName, brokerId, 1000); + } + + public DelayedOperationPurgatory(String purgatoryName, int brokerId, int purgeInterval) { + this(purgatoryName, new SystemTimer(purgatoryName), brokerId, purgeInterval, true, true); + } + + /** + * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. + */ + @SuppressWarnings("this-escape") + public DelayedOperationPurgatory(String purgatoryName, + Timer timeoutTimer, + int brokerId, + int purgeInterval, + boolean reaperEnabled, + boolean timerEnabled) { + this.purgatoryName = purgatoryName; + this.timeoutTimer = timeoutTimer; + this.brokerId = brokerId; + this.purgeInterval = purgeInterval; + this.reaperEnabled = reaperEnabled; + this.timerEnabled = timerEnabled; + + watcherLists = new ArrayList<>(SHARDS); + for (int i = 0; i < SHARDS; i++) { + watcherLists.add(new WatcherList()); + } + metricsTags = Collections.singletonMap("delayedOperation", purgatoryName); + metricsGroup.newGauge("PurgatorySize", this::watched, metricsTags); + metricsGroup.newGauge("NumDelayedOperations", this::numDelayed, metricsTags); + if (reaperEnabled) { + expirationReaper.start(); + } + } + + private WatcherList watcherList(DelayedOperationKey key) { + return watcherLists.get(Math.abs(key.hashCode() % watcherLists.size())); + } + + /** + * Check if the operation can be completed, if not watch it based on the given watch keys + * <br/> + * Note that a delayed operation can be watched on multiple keys. It is possible that + * an operation is completed after it has been added to the watch list for some, but + * not all the keys. In this case, the operation is considered completed and won't + * be added to the watch list of the remaining keys. The expiration reaper thread will + * remove this operation from any watcher list in which the operation exists. + * + * @param operation the delayed operation to be checked + * @param watchKeys keys for bookkeeping the operation + * @return true iff the delayed operations can be completed by the caller + */ + public <K extends DelayedOperationKey> boolean tryCompleteElseWatch(T operation, List<K> watchKeys) { + if (watchKeys.isEmpty()) { + throw new IllegalArgumentException("The watch key list can't be empty"); + } + + // The cost of tryComplete() is typically proportional to the number of keys. Calling tryComplete() for each key is + // going to be expensive if there are many keys. Instead, we do the check in the following way through safeTryCompleteOrElse(). + // If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again. At + // this time, if the operation is still not completed, we are guaranteed that it won't miss any future triggering + // event since the operation is already on the watcher list for all keys. + // + // ==============[story about lock]============== + // Through safeTryCompleteOrElse(), we hold the operation's lock while adding the operation to watch list and doing + // the tryComplete() check. This is to avoid a potential deadlock between the callers to tryCompleteElseWatch() and + // checkAndComplete(). For example, the following deadlock can happen if the lock is only held for the final tryComplete() + // 1) thread_a holds readlock of stateLock from TransactionStateManager + // 2) thread_a is executing tryCompleteElseWatch() + // 3) thread_a adds op to watch list + // 4) thread_b requires writelock of stateLock from TransactionStateManager (blocked by thread_a) + // 5) thread_c calls checkAndComplete() and holds lock of op + // 6) thread_c is waiting readlock of stateLock to complete op (blocked by thread_b) + // 7) thread_a is waiting lock of op to call the final tryComplete() (blocked by thread_c) + // + // Note that even with the current approach, deadlocks could still be introduced. For example, + // 1) thread_a calls tryCompleteElseWatch() and gets lock of op + // 2) thread_a adds op to watch list + // 3) thread_a calls op#tryComplete and tries to require lock_b + // 4) thread_b holds lock_b and calls checkAndComplete() + // 5) thread_b sees op from watch list + // 6) thread_b needs lock of op + // To avoid the above scenario, we recommend DelayedOperationPurgatory.checkAndComplete() be called without holding + // any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously, + // holding an exclusive lock to make the call is often unnecessary. + if (operation.safeTryCompleteOrElse(() -> { + watchKeys.forEach(key -> watchForOperation(key, operation)); + if (!watchKeys.isEmpty()) estimatedTotalOperations.incrementAndGet(); + })) { + return true; + } + + + // if it cannot be completed by now and hence is watched, add to the timeout queue also + if (!operation.isCompleted()) { + if (timerEnabled) + timeoutTimer.add(operation); + if (operation.isCompleted()) { + // cancel the timer task + operation.cancel(); + } + } + return false; + } + + /** + * Check if some delayed operations can be completed with the given watch key, + * and if yes complete them. + * + * @return the number of completed operations during this process + */ + public <K extends DelayedOperationKey> int checkAndComplete(K key) { + WatcherList wl = watcherList(key); + Watchers watchers; + wl.watchersLock.lock(); + try { + watchers = wl.watchersByKey.get(key); + } finally { + wl.watchersLock.unlock(); + } + int numCompleted = watchers == null ? 0 : watchers.tryCompleteWatched(); + + if (numCompleted > 0) { + LOG.debug("Request key {} unblocked {} {} operations", key, numCompleted, purgatoryName); + } + return numCompleted; + } + + /** + * Return the total size of watch lists the purgatory. Since an operation may be watched + * on multiple lists, and some of its watched entries may still be in the watch lists + * even when it has been completed, this number may be larger than the number of real operations watched + */ + public int watched() { + int sum = 0; + for (WatcherList watcherList : watcherLists) { + sum += watcherList.allWatchers().stream().map(Watchers::countWatched).mapToInt(c -> c).sum(); + } + return sum; + } + + /** + * Return the number of delayed operations in the expiry queue + */ + public int numDelayed() { + return timeoutTimer.size(); + } + + /** + * Cancel watching on any delayed operations for the given key. Note the operation will not be completed + */ + public List<T> cancelForKey(DelayedOperationKey key) { + WatcherList wl = watcherList(key); + wl.watchersLock.lock(); + try { + Watchers watchers = wl.watchersByKey.remove(key); + if (watchers != null) + return watchers.cancel(); + else + return Collections.emptyList(); + } finally { + wl.watchersLock.unlock(); + } + } + + /* + * Return the watch list of the given key, note that we need to + * grab the removeWatchersLock to avoid the operation being added to a removed watcher list + */ + private void watchForOperation(DelayedOperationKey key, T operation) { + WatcherList wl = watcherList(key); + wl.watchersLock.lock(); + try { + Watchers watcher = wl.watchersByKey.computeIfAbsent(key, Watchers::new); + watcher.watch(operation); + } finally { + wl.watchersLock.unlock(); + } + } + + /* + * Remove the key from watcher lists if its list is empty + */ + private void removeKeyIfEmpty(DelayedOperationKey key, Watchers watchers) { + WatcherList wl = watcherList(key); + wl.watchersLock.lock(); + try { + // if the current key is no longer correlated to the watchers to remove, skip + if (wl.watchersByKey.get(key) != watchers) + return; + + if (watchers != null && watchers.isEmpty()) { + wl.watchersByKey.remove(key); + } + } finally { + wl.watchersLock.unlock(); + } + } + + /** + * Shutdown the expiration reaper thread + */ + public void shutdown() throws Exception { + if (reaperEnabled) { + expirationReaper.initiateShutdown(); + // improve shutdown time by waking up any ShutdownableThread blocked on poll by sending a no-op + timeoutTimer.add(new TimerTask(0) { + @Override + public void run() {} + }); + expirationReaper.awaitShutdown(); + } + timeoutTimer.close(); + metricsGroup.removeMetric("PurgatorySize", metricsTags); + metricsGroup.removeMetric("NumDelayedOperations", metricsTags); + } + + /** + * A list of operation watching keys + */ + class WatcherList { + private final ConcurrentHashMap<DelayedOperationKey, Watchers> watchersByKey = new ConcurrentHashMap<>(); + + private final ReentrantLock watchersLock = new ReentrantLock(); + + /* + * Return all the current watcher lists, + * note that the returned watchers may be removed from the list by other threads + */ + Collection<Watchers> allWatchers() { + return watchersByKey.values(); + } + } + + /** + * A linked list of watched delayed operations based on some key + */ + class Watchers { + + private final ConcurrentLinkedQueue<T> operations = new ConcurrentLinkedQueue<>(); + + private final DelayedOperationKey key; + Watchers(DelayedOperationKey key) { + this.key = key; + } + + // count the current number of watched operations. This is O(n), so use isEmpty() if possible + int countWatched() { + return operations.size(); + } + + boolean isEmpty() { + return operations.isEmpty(); + } + + // add the element to watch + void watch(T t) { + operations.add(t); + } + + // traverse the list and try to complete some watched elements + int tryCompleteWatched() { + int completed = 0; + + Iterator<T> iter = operations.iterator(); + while (iter.hasNext()) { + T curr = iter.next(); + if (curr.isCompleted()) { + // another thread has completed this operation, just remove it + iter.remove(); + } else if (curr.safeTryComplete()) { + iter.remove(); + completed += 1; + } + } + + if (operations.isEmpty()) + removeKeyIfEmpty(key, this); + + return completed; + } + + List<T> cancel() { + Iterator<T> iter = operations.iterator(); + List<T> cancelled = new ArrayList<>(); + while (iter.hasNext()) { + T curr = iter.next(); + curr.cancel(); + iter.remove(); + cancelled.add(curr); + } + return cancelled; + } + + // traverse the list and purge elements that are already completed by others + int purgeCompleted() { + int purged = 0; + + Iterator<T> iter = operations.iterator(); + while (iter.hasNext()) { + T curr = iter.next(); + if (curr.isCompleted()) { + iter.remove(); + purged += 1; + } + } + + if (operations.isEmpty()) + removeKeyIfEmpty(key, this); + + return purged; + } + } + + private void advanceClock(long timeoutMs) throws InterruptedException { + timeoutTimer.advanceClock(timeoutMs); + + // Trigger a purge if the number of completed but still being watched operations is larger than + // the purge threshold. That number is computed by the difference btw the estimated total number of + // operations and the number of pending delayed operations. + if (estimatedTotalOperations.get() - numDelayed() > purgeInterval) { + // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to + // clean up watchers. Note that, if more operations are completed during the cleanup, we may end up with + // a little overestimated total number of operations. + estimatedTotalOperations.getAndSet(numDelayed()); + LOG.debug("Begin purging watch lists"); + int purged = 0; + for (WatcherList watcherList : watcherLists) { + purged += watcherList.allWatchers().stream().map(Watchers::purgeCompleted).mapToInt(i -> i).sum(); Review Comment: This can be a bit simpler. `purged += watcherList.allWatchers().stream().mapToInt(Watchers::purgeCompleted).sum();` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
