This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 34232d7bd4 IndexOutOfBoundsException while serializing CommandsForKey
34232d7bd4 is described below

commit 34232d7bd45761a1c14c7e91d2f8e5ae183bc8e3
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Fri May 17 13:50:01 2024 -0700

    IndexOutOfBoundsException while serializing CommandsForKey
    
    patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19642
---
 modules/accord                                     |  2 +-
 .../serializers/CommandsForKeySerializer.java      | 29 ++++++++++--------
 .../serializers/CommandsForKeySerializerTest.java  | 35 ++++++++++++++++++++--
 3 files changed, 50 insertions(+), 16 deletions(-)

diff --git a/modules/accord b/modules/accord
index d63d06aafe..21cdaf5d28 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit d63d06aafe2e60e57a9651ff6dd491175bbe6916
+Subproject commit 21cdaf5d280965cfdc690d385375635b498bc9f9
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
index dbe2f4845f..a81b62b4a3 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
@@ -385,15 +385,18 @@ public class CommandsForKeySerializer
             VIntCoding.writeUnsignedVInt32(unmanagedPendingCommitCount, out);
             VIntCoding.writeUnsignedVInt32(cfk.unmanagedCount() - 
unmanagedPendingCommitCount, out);
             Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ? 
Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT;
-            for (int i = 0 ; i < cfk.unmanagedCount() ; ++i)
             {
-                Unmanaged unmanaged = cfk.getUnmanaged(i);
-                Invariants.checkState(unmanaged.pending == pending);
-                CommandSerializers.txnId.serialize(unmanaged.txnId, out, 
ByteBufferAccessor.instance, out.position());
-                out.position(out.position() + 
CommandSerializers.txnId.serializedSize());
-                CommandSerializers.timestamp.serialize(unmanaged.waitingUntil, 
out, ByteBufferAccessor.instance, out.position());
-                out.position(out.position() + 
CommandSerializers.timestamp.serializedSize());
-                if (--unmanagedPendingCommitCount == 0) pending = 
Unmanaged.Pending.APPLY;
+                int offset = 0;
+                for (int i = 0 ; i < cfk.unmanagedCount() ; ++i)
+                {
+                    Unmanaged unmanaged = cfk.getUnmanaged(i);
+                    Invariants.checkState(unmanaged.pending == pending);
+
+                    offset += 
CommandSerializers.txnId.serialize(unmanaged.txnId, out, 
ByteBufferAccessor.instance, offset);
+                    offset += 
CommandSerializers.timestamp.serialize(unmanaged.waitingUntil, out, 
ByteBufferAccessor.instance, offset);
+                    if (--unmanagedPendingCommitCount == 0) pending = 
Unmanaged.Pending.APPLY;
+                }
+                out.position(out.position() + offset);
             }
 
             if ((executeAtCount | missingIdCount) > 0)
@@ -610,15 +613,17 @@ public class CommandsForKeySerializer
         {
             unmanageds = new Unmanaged[unmanagedCount];
             Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ? 
Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT;
+            int offset = 0;
             for (int i = 0 ; i < unmanagedCount ; ++i)
             {
-                TxnId txnId = CommandSerializers.txnId.deserialize(in, 
ByteBufferAccessor.instance, in.position());
-                in.position(in.position() + 
CommandSerializers.txnId.serializedSize());
-                Timestamp waitingUntil = 
CommandSerializers.timestamp.deserialize(in, ByteBufferAccessor.instance, 
in.position());
-                in.position(in.position() + 
CommandSerializers.timestamp.serializedSize());
+                TxnId txnId = CommandSerializers.txnId.deserialize(in, 
ByteBufferAccessor.instance, offset);
+                offset += CommandSerializers.txnId.serializedSize();
+                Timestamp waitingUntil = 
CommandSerializers.timestamp.deserialize(in, ByteBufferAccessor.instance, 
offset);
+                offset += CommandSerializers.timestamp.serializedSize();
                 unmanageds[i] = new Unmanaged(pending, txnId, waitingUntil);
                 if (--unmanagedPendingCommitCount == 0) pending = 
Unmanaged.Pending.APPLY;
             }
+            in.position(in.position() + offset);
         }
 
         if (executeAtMasks + missingDepsMasks > 0)
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 405f92dc59..202dc5cb78 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -33,6 +33,7 @@ import java.util.function.IntSupplier;
 import java.util.function.LongUnaryOperator;
 import java.util.function.Supplier;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -42,6 +43,7 @@ import accord.local.CommandsForKey;
 import accord.local.CommandsForKey.InternalStatus;
 import accord.local.Command;
 import accord.local.CommandsForKey.TxnInfo;
+import accord.local.CommandsForKey.Unmanaged;
 import accord.local.CommonAttributes;
 import accord.local.CommonAttributes.Mutable;
 import accord.local.Listeners;
@@ -59,6 +61,7 @@ import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.AccordGens;
+import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.RandomSource;
 import accord.utils.SortedArrays;
@@ -465,12 +468,38 @@ public class CommandsForKeySerializerTest
                     next = txnIdGen.next(rs0);
                 return next;
             }).unique().ofSizeBetween(0, 10).next(rs);
+            Arrays.sort(ids, Comparator.naturalOrder());
             TxnInfo[] info = new TxnInfo[ids.length];
             for (int i = 0; i < info.length; i++)
                 info[i] = TxnInfo.create(ids[i], 
rs.pick(InternalStatus.values()), ids[i], CommandsForKey.NO_TXNIDS);
-            Arrays.sort(info, Comparator.naturalOrder());
 
-            CommandsForKey expected = 
CommandsForKey.SerializerSupport.create(pk, info, 
CommandsForKey.NO_PENDING_UNMANAGED);
+            Gen<Unmanaged.Pending> pendingGen = 
Gens.enums().allMixedDistribution(Unmanaged.Pending.class).next(rs);
+
+            Unmanaged[] unmanaged = Gens.lists(txnIdGen)
+                                        .unique()
+                                        .ofSizeBetween(0, 10)
+                                        .map((rs0, txnIds) -> 
txnIds.stream().map(i -> new Unmanaged(pendingGen.next(rs0), i, 
i)).toArray(Unmanaged[]::new))
+                                        .next(rs);
+            Arrays.sort(unmanaged, Comparator.naturalOrder());
+            if (unmanaged.length > 0)
+            {
+                // when registering unmanaged, if the txn is "missing" in 
TxnInfo we add it
+                List<TxnInfo> missing = new ArrayList<>(unmanaged.length);
+                for (Unmanaged u : unmanaged)
+                {
+                    int idx = Arrays.binarySearch(ids, u.txnId);
+                    if (idx < 0)
+                        missing.add(TxnInfo.create(u.txnId, 
InternalStatus.TRANSITIVELY_KNOWN));
+                }
+                if (!missing.isEmpty())
+                {
+                    info = ArrayUtils.addAll(info, 
missing.toArray(TxnInfo[]::new));
+                    Arrays.sort(info, Comparator.naturalOrder());
+                }
+            }
+            else unmanaged = CommandsForKey.NO_PENDING_UNMANAGED;
+
+            CommandsForKey expected = 
CommandsForKey.SerializerSupport.create(pk, info, unmanaged);
 
             ByteBuffer buffer = 
CommandsForKeySerializer.toBytesWithoutKey(expected);
             CommandsForKey roundTrip = CommandsForKeySerializer.fromBytes(pk, 
buffer);
@@ -493,4 +522,4 @@ public class CommandsForKeySerializerTest
         CommandsForKey roundTrip = CommandsForKeySerializer.fromBytes(pk, 
buffer);
         Assert.assertEquals(expected, roundTrip);
     }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to