This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord-stabilize-tests
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit a8decd72ae9cc401cf4166000586be7752ba144a
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Tue Oct 1 10:34:42 2024 +0100

    Fix timeout deadlock
---
 .../java/accord/impl/DefaultRequestTimeouts.java   | 45 +++++++++++++++++-----
 .../src/main/java/accord/utils/ArrayBuffers.java   | 37 ++++++++++++++++++
 2 files changed, 73 insertions(+), 9 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/DefaultRequestTimeouts.java 
b/accord-core/src/main/java/accord/impl/DefaultRequestTimeouts.java
index 8f11e5f1..d278eafd 100644
--- a/accord-core/src/main/java/accord/impl/DefaultRequestTimeouts.java
+++ b/accord-core/src/main/java/accord/impl/DefaultRequestTimeouts.java
@@ -24,6 +24,8 @@ import java.util.function.Function;
 
 import accord.api.RequestTimeouts;
 import accord.local.Node;
+import accord.utils.ArrayBuffers;
+import accord.utils.ArrayBuffers.BufferList;
 import accord.utils.LogGroupTimers;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -91,16 +93,41 @@ public class DefaultRequestTimeouts implements 
RequestTimeouts
         @Override
         public void run()
         {
-            lock.lock();
-            try
+            try (BufferList<Registered> collect = new BufferList<>())
             {
-                long now = node.elapsed(MILLISECONDS);
-                // TODO (expected): should we handle reentrancy? Or at least 
throw an exception?
-                timeouts.advance(now, this, (s, r) -> r.timeout.timeout());
-            }
-            finally
-            {
-                lock.unlock();
+                int i = 0;
+                try
+                {
+                    lock.lock();
+                    try
+                    {
+                        long now = node.elapsed(MILLISECONDS);
+                        // TODO (expected): should we handle reentrancy? Or at 
least throw an exception?
+                        timeouts.advance(now, collect, BufferList::add);
+                    }
+                    finally
+                    {
+                        lock.unlock();
+                    }
+
+                    while (i < collect.size())
+                        collect.get(i++).timeout.timeout();
+                }
+                catch (Throwable t)
+                {
+                    while (i < collect.size())
+                    {
+                        try
+                        {
+                            collect.get(i++).timeout.timeout();
+                        }
+                        catch (Throwable t2)
+                        {
+                            t.addSuppressed(t2);
+                        }
+                    }
+                    throw t;
+                }
             }
         }
 
diff --git a/accord-core/src/main/java/accord/utils/ArrayBuffers.java 
b/accord-core/src/main/java/accord/utils/ArrayBuffers.java
index 388c5e2c..5973c37b 100644
--- a/accord-core/src/main/java/accord/utils/ArrayBuffers.java
+++ b/accord-core/src/main/java/accord/utils/ArrayBuffers.java
@@ -23,8 +23,11 @@ import accord.api.RoutingKey;
 import accord.primitives.Range;
 import accord.primitives.TxnId;
 
+import java.io.Closeable;
 import java.lang.reflect.Array;
+import java.util.AbstractList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.function.IntFunction;
 
 import static accord.utils.Invariants.illegalState;
@@ -783,4 +786,38 @@ public class ArrayBuffers
         }
     }
 
+    public static class BufferList<E> extends AbstractList<E> implements 
Closeable
+    {
+        private static final Object[] EMPTY = new Object[0];
+        private Object[] buffer = EMPTY;
+        private int size;
+
+        @Override
+        public E get(int index)
+        {
+            return (E) buffer[index];
+        }
+
+        @Override
+        public int size()
+        {
+            return size;
+        }
+
+        @Override
+        public boolean add(E e)
+        {
+            if (size == buffer.length)
+                buffer = cachedAny().resize(buffer, size, Math.min(8, size * 
2));
+            buffer[size++] = e;
+            return true;
+        }
+
+        public void close()
+        {
+            if (buffer == null) return;
+            cachedAny().forceDiscard(buffer, size);
+            buffer = null;
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to