This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cep-15-accord-stabilize-tests in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit a8decd72ae9cc401cf4166000586be7752ba144a Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Tue Oct 1 10:34:42 2024 +0100 Fix timeout deadlock --- .../java/accord/impl/DefaultRequestTimeouts.java | 45 +++++++++++++++++----- .../src/main/java/accord/utils/ArrayBuffers.java | 37 ++++++++++++++++++ 2 files changed, 73 insertions(+), 9 deletions(-) diff --git a/accord-core/src/main/java/accord/impl/DefaultRequestTimeouts.java b/accord-core/src/main/java/accord/impl/DefaultRequestTimeouts.java index 8f11e5f1..d278eafd 100644 --- a/accord-core/src/main/java/accord/impl/DefaultRequestTimeouts.java +++ b/accord-core/src/main/java/accord/impl/DefaultRequestTimeouts.java @@ -24,6 +24,8 @@ import java.util.function.Function; import accord.api.RequestTimeouts; import accord.local.Node; +import accord.utils.ArrayBuffers; +import accord.utils.ArrayBuffers.BufferList; import accord.utils.LogGroupTimers; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -91,16 +93,41 @@ public class DefaultRequestTimeouts implements RequestTimeouts @Override public void run() { - lock.lock(); - try + try (BufferList<Registered> collect = new BufferList<>()) { - long now = node.elapsed(MILLISECONDS); - // TODO (expected): should we handle reentrancy? Or at least throw an exception? - timeouts.advance(now, this, (s, r) -> r.timeout.timeout()); - } - finally - { - lock.unlock(); + int i = 0; + try + { + lock.lock(); + try + { + long now = node.elapsed(MILLISECONDS); + // TODO (expected): should we handle reentrancy? Or at least throw an exception? + timeouts.advance(now, collect, BufferList::add); + } + finally + { + lock.unlock(); + } + + while (i < collect.size()) + collect.get(i++).timeout.timeout(); + } + catch (Throwable t) + { + while (i < collect.size()) + { + try + { + collect.get(i++).timeout.timeout(); + } + catch (Throwable t2) + { + t.addSuppressed(t2); + } + } + throw t; + } } } diff --git a/accord-core/src/main/java/accord/utils/ArrayBuffers.java b/accord-core/src/main/java/accord/utils/ArrayBuffers.java index 388c5e2c..5973c37b 100644 --- a/accord-core/src/main/java/accord/utils/ArrayBuffers.java +++ b/accord-core/src/main/java/accord/utils/ArrayBuffers.java @@ -23,8 +23,11 @@ import accord.api.RoutingKey; import accord.primitives.Range; import accord.primitives.TxnId; +import java.io.Closeable; import java.lang.reflect.Array; +import java.util.AbstractList; import java.util.Arrays; +import java.util.List; import java.util.function.IntFunction; import static accord.utils.Invariants.illegalState; @@ -783,4 +786,38 @@ public class ArrayBuffers } } + public static class BufferList<E> extends AbstractList<E> implements Closeable + { + private static final Object[] EMPTY = new Object[0]; + private Object[] buffer = EMPTY; + private int size; + + @Override + public E get(int index) + { + return (E) buffer[index]; + } + + @Override + public int size() + { + return size; + } + + @Override + public boolean add(E e) + { + if (size == buffer.length) + buffer = cachedAny().resize(buffer, size, Math.min(8, size * 2)); + buffer[size++] = e; + return true; + } + + public void close() + { + if (buffer == null) return; + cachedAny().forceDiscard(buffer, size); + buffer = null; + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org