This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new 34232d7bd4 IndexOutOfBoundsException while serializing CommandsForKey 34232d7bd4 is described below commit 34232d7bd45761a1c14c7e91d2f8e5ae183bc8e3 Author: David Capwell <dcapw...@apache.org> AuthorDate: Fri May 17 13:50:01 2024 -0700 IndexOutOfBoundsException while serializing CommandsForKey patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19642 --- modules/accord | 2 +- .../serializers/CommandsForKeySerializer.java | 29 ++++++++++-------- .../serializers/CommandsForKeySerializerTest.java | 35 ++++++++++++++++++++-- 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/modules/accord b/modules/accord index d63d06aafe..21cdaf5d28 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit d63d06aafe2e60e57a9651ff6dd491175bbe6916 +Subproject commit 21cdaf5d280965cfdc690d385375635b498bc9f9 diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java index dbe2f4845f..a81b62b4a3 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java @@ -385,15 +385,18 @@ public class CommandsForKeySerializer VIntCoding.writeUnsignedVInt32(unmanagedPendingCommitCount, out); VIntCoding.writeUnsignedVInt32(cfk.unmanagedCount() - unmanagedPendingCommitCount, out); Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ? Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT; - for (int i = 0 ; i < cfk.unmanagedCount() ; ++i) { - Unmanaged unmanaged = cfk.getUnmanaged(i); - Invariants.checkState(unmanaged.pending == pending); - CommandSerializers.txnId.serialize(unmanaged.txnId, out, ByteBufferAccessor.instance, out.position()); - out.position(out.position() + CommandSerializers.txnId.serializedSize()); - CommandSerializers.timestamp.serialize(unmanaged.waitingUntil, out, ByteBufferAccessor.instance, out.position()); - out.position(out.position() + CommandSerializers.timestamp.serializedSize()); - if (--unmanagedPendingCommitCount == 0) pending = Unmanaged.Pending.APPLY; + int offset = 0; + for (int i = 0 ; i < cfk.unmanagedCount() ; ++i) + { + Unmanaged unmanaged = cfk.getUnmanaged(i); + Invariants.checkState(unmanaged.pending == pending); + + offset += CommandSerializers.txnId.serialize(unmanaged.txnId, out, ByteBufferAccessor.instance, offset); + offset += CommandSerializers.timestamp.serialize(unmanaged.waitingUntil, out, ByteBufferAccessor.instance, offset); + if (--unmanagedPendingCommitCount == 0) pending = Unmanaged.Pending.APPLY; + } + out.position(out.position() + offset); } if ((executeAtCount | missingIdCount) > 0) @@ -610,15 +613,17 @@ public class CommandsForKeySerializer { unmanageds = new Unmanaged[unmanagedCount]; Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ? Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT; + int offset = 0; for (int i = 0 ; i < unmanagedCount ; ++i) { - TxnId txnId = CommandSerializers.txnId.deserialize(in, ByteBufferAccessor.instance, in.position()); - in.position(in.position() + CommandSerializers.txnId.serializedSize()); - Timestamp waitingUntil = CommandSerializers.timestamp.deserialize(in, ByteBufferAccessor.instance, in.position()); - in.position(in.position() + CommandSerializers.timestamp.serializedSize()); + TxnId txnId = CommandSerializers.txnId.deserialize(in, ByteBufferAccessor.instance, offset); + offset += CommandSerializers.txnId.serializedSize(); + Timestamp waitingUntil = CommandSerializers.timestamp.deserialize(in, ByteBufferAccessor.instance, offset); + offset += CommandSerializers.timestamp.serializedSize(); unmanageds[i] = new Unmanaged(pending, txnId, waitingUntil); if (--unmanagedPendingCommitCount == 0) pending = Unmanaged.Pending.APPLY; } + in.position(in.position() + offset); } if (executeAtMasks + missingDepsMasks > 0) diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java index 405f92dc59..202dc5cb78 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java @@ -33,6 +33,7 @@ import java.util.function.IntSupplier; import java.util.function.LongUnaryOperator; import java.util.function.Supplier; +import org.apache.commons.lang3.ArrayUtils; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -42,6 +43,7 @@ import accord.local.CommandsForKey; import accord.local.CommandsForKey.InternalStatus; import accord.local.Command; import accord.local.CommandsForKey.TxnInfo; +import accord.local.CommandsForKey.Unmanaged; import accord.local.CommonAttributes; import accord.local.CommonAttributes.Mutable; import accord.local.Listeners; @@ -59,6 +61,7 @@ import accord.primitives.Txn; import accord.primitives.TxnId; import accord.primitives.Writes; import accord.utils.AccordGens; +import accord.utils.Gen; import accord.utils.Gens; import accord.utils.RandomSource; import accord.utils.SortedArrays; @@ -465,12 +468,38 @@ public class CommandsForKeySerializerTest next = txnIdGen.next(rs0); return next; }).unique().ofSizeBetween(0, 10).next(rs); + Arrays.sort(ids, Comparator.naturalOrder()); TxnInfo[] info = new TxnInfo[ids.length]; for (int i = 0; i < info.length; i++) info[i] = TxnInfo.create(ids[i], rs.pick(InternalStatus.values()), ids[i], CommandsForKey.NO_TXNIDS); - Arrays.sort(info, Comparator.naturalOrder()); - CommandsForKey expected = CommandsForKey.SerializerSupport.create(pk, info, CommandsForKey.NO_PENDING_UNMANAGED); + Gen<Unmanaged.Pending> pendingGen = Gens.enums().allMixedDistribution(Unmanaged.Pending.class).next(rs); + + Unmanaged[] unmanaged = Gens.lists(txnIdGen) + .unique() + .ofSizeBetween(0, 10) + .map((rs0, txnIds) -> txnIds.stream().map(i -> new Unmanaged(pendingGen.next(rs0), i, i)).toArray(Unmanaged[]::new)) + .next(rs); + Arrays.sort(unmanaged, Comparator.naturalOrder()); + if (unmanaged.length > 0) + { + // when registering unmanaged, if the txn is "missing" in TxnInfo we add it + List<TxnInfo> missing = new ArrayList<>(unmanaged.length); + for (Unmanaged u : unmanaged) + { + int idx = Arrays.binarySearch(ids, u.txnId); + if (idx < 0) + missing.add(TxnInfo.create(u.txnId, InternalStatus.TRANSITIVELY_KNOWN)); + } + if (!missing.isEmpty()) + { + info = ArrayUtils.addAll(info, missing.toArray(TxnInfo[]::new)); + Arrays.sort(info, Comparator.naturalOrder()); + } + } + else unmanaged = CommandsForKey.NO_PENDING_UNMANAGED; + + CommandsForKey expected = CommandsForKey.SerializerSupport.create(pk, info, unmanaged); ByteBuffer buffer = CommandsForKeySerializer.toBytesWithoutKey(expected); CommandsForKey roundTrip = CommandsForKeySerializer.fromBytes(pk, buffer); @@ -493,4 +522,4 @@ public class CommandsForKeySerializerTest CommandsForKey roundTrip = CommandsForKeySerializer.fromBytes(pk, buffer); Assert.assertEquals(expected, roundTrip); } -} \ No newline at end of file +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org