This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 8b7753f7ecbb0234b19ea1972ca8ac4574b06678
Author: Blake Eggleston <[email protected]>
AuthorDate: Tue Sep 2 12:35:59 2025 -0700

    Single-partition read fixes:
    - filter augmenting mutations and add comment explaining why we don't 
exclude reconciled mutations when augmenting
    - propagate exceptions during acknowledgeReconcile
    - fix UnreconciledMutations handling of single key bounds
    - fix single offset remove bug
    
    patch by Blake Eggleston; reviewed by Caleb Rackliffe for CASSANDRA-20830
---
 .../cassandra/replication/Log2OffsetsMap.java      |  29 ++
 .../org/apache/cassandra/replication/Offsets.java  |   6 +-
 .../replication/UnreconciledMutations.java         |  15 +-
 .../service/reads/tracked/PartialTrackedRead.java  |   9 +
 .../tracked/PartialTrackedSinglePartitionRead.java |   8 +-
 .../service/reads/tracked/ReadReconciliations.java |  14 +
 .../service/reads/tracked/TrackedLocalReads.java   |  14 +-
 .../apache/cassandra/replication/OffsetsTest.java  |  22 ++
 .../replication/UnreconciledMutationsTest.java     | 429 +++++++++++++++++++++
 9 files changed, 538 insertions(+), 8 deletions(-)

diff --git a/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java 
b/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java
index f7b03b15c5..8e3b2e1bc1 100644
--- a/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java
+++ b/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.replication;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.Map;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
@@ -55,6 +56,34 @@ public abstract class Log2OffsetsMap<T extends Offsets> 
implements Iterable<Shor
         return count;
     }
 
+    @Override
+    public String toString()
+    {
+
+        StringBuilder builder = new StringBuilder("Log2OffsetsMap{");
+        boolean isFirst = true;
+        for (Map.Entry<Long, T> entry : asMap().entrySet())
+        {
+            if (!isFirst)
+                builder.append(", ");
+
+            CoordinatorLogId logId = new CoordinatorLogId(entry.getKey());
+            T offsets = entry.getValue();
+            builder.append(logId);
+            builder.append("->");
+            builder.append(offsets);
+            isFirst = false;
+        }
+        for (T offsets : offsets())
+        {
+            if (!isFirst)
+                builder.append(", ");
+            builder.append(offsets);
+            isFirst = false;
+        }
+        return builder.append('}').toString();
+    }
+
     public boolean isEmpty()
     {
         return Iterables.all(offsets(), Offsets::isEmpty);
diff --git a/src/java/org/apache/cassandra/replication/Offsets.java 
b/src/java/org/apache/cassandra/replication/Offsets.java
index bd7204fc8a..fc080452a7 100644
--- a/src/java/org/apache/cassandra/replication/Offsets.java
+++ b/src/java/org/apache/cassandra/replication/Offsets.java
@@ -354,9 +354,11 @@ public abstract class Offsets implements 
Iterable<ShortMutationId>
                 if (pos - 1 >= 0 && bounds[pos - 1] == offset)
                     pos--; // normalize pos to always point to floor if this 
is a single-offset range
 
-                if (bounds[pos] == bounds[pos + 1]) // remove entire 
single-offset range
+                int rangeStart = pos - (pos % 2);
+
+                if (bounds[rangeStart] == bounds[rangeStart + 1]) // remove 
entire single-offset range
                 {
-                    System.arraycopy(bounds, pos + 2, bounds, pos, size - pos 
- 2);
+                    System.arraycopy(bounds, rangeStart + 2, bounds, 
rangeStart, size - rangeStart - 2);
                     bounds[--size] = 0;
                     bounds[--size] = 0;
                 }
diff --git 
a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java 
b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
index 740882b739..5a6e9d1f55 100644
--- a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
+++ b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
@@ -170,8 +170,19 @@ public class UnreconciledMutations
         int cmp = start.token.compareTo(end.token);
         if (cmp == 0)
         {
-            // full range
-            return collect(statesSet, tableId, includePending, into);
+            // When start and end tokens are equal, check if this is a 
single-token range
+            // Single-token ranges have start inclusive (offset=0) and end 
inclusive (offset=MAX_VALUE)
+            if (start.offset == 0 && end.offset == Integer.MAX_VALUE)
+            {
+                // Single token range - collect only mutations for this 
specific token
+                SortedSet<Entry> subset = 
statesSet.subSet(Entry.start(start.token, true), Entry.end(end.token, true));
+                return collect(subset, tableId, includePending, into);
+            }
+            else
+            {
+                // Full range - collect all mutations
+                return collect(statesSet, tableId, includePending, into);
+            }
         }
         else if (cmp > 0)
         {
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java 
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java
index cd3b137b6f..c4db7ce259 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java
@@ -35,9 +35,13 @@ import org.apache.cassandra.replication.Log2OffsetsMap;
 import org.apache.cassandra.replication.MutationJournal;
 import org.apache.cassandra.replication.ShortMutationId;
 import org.apache.cassandra.transport.Dispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public interface PartialTrackedRead
 {
+    Logger logger = LoggerFactory.getLogger(PartialTrackedRead.class);
+    
     interface CompletedRead extends AutoCloseable
     {
         TrackedDataResponse response(); // must be called from the read stage
@@ -95,6 +99,11 @@ public interface PartialTrackedRead
     {
         Mutation mutation = MutationJournal.instance.read(mutationId);
         Preconditions.checkNotNull(mutation);
+        if (!command().selectsKey(mutation.key()))
+        {
+            logger.trace("Skipping mutation {} - {} not in read range", 
mutationId, mutation.key());
+            return;
+        }
         augment(mutation);
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java
 
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java
index 368a4f0bfd..895706f056 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.service.reads.tracked;
 
 import java.util.List;
 
-import com.google.common.base.Preconditions;
-
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadExecutionController;
@@ -78,7 +76,11 @@ public class PartialTrackedSinglePartitionRead extends 
AbstractPartialTrackedRea
         @Override
         public State augment(PartitionUpdate update)
         {
-            
Preconditions.checkArgument(update.partitionKey().equals(command.partitionKey()));
+            if (!update.partitionKey().equals(command.partitionKey()))
+                throw new IllegalArgumentException(String.format("Received 
update for partition key %s but command was for %s",
+                                                                 
update.partitionKey(),
+                                                                 
command.partitionKey()));
+
             if (augmentedData == null)
                 augmentedData = new 
SimpleBTreePartition(command.partitionKey(), command.metadata(), 
UpdateTransaction.NO_OP);
 
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
index 17851a24e3..1c57781c00 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
@@ -263,6 +263,20 @@ public class ReadReconciliations implements 
ExpiredStatePurger.Expireable
 
         /**
          * Remote summaries minus data node summary offsets
+         * 
+         * This calculation combines BOTH reconciled and unreconciled 
mutations reported by other nodes, and
+         * then subtracts mutations reported locally for correctness
+         *
+         * If we subtracted reconciled ids from the unreconciled ids, we could 
violate read monotonicity in this scenario:
+         * 1. Read starts locally and doesn't see mutation M.
+         * 2. During reconciliation, mutation M arrives and is marked 
reconciled, other replicas report mutation M as reconciled
+         * 3. If we filtered out reconciled mutations, this read wouldn't 
augment with M
+         * 4. A concurrent read could see M in its initial data
+         * 5. This read returns without M
+         * 
+         * Instead, we include all mutations and rely on token range filtering 
during actual mutation
+         * retrieval (in PartialTrackedRead.augment) to ensure we only augment 
with mutations 
+         * relevant to this read's range/key
          */
         private Log2OffsetsMap.Mutable augmentingOffsets()
         {
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java 
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
index ab705a6346..066ebc22e0 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
@@ -222,7 +222,19 @@ public class TrackedLocalReads implements 
ExpiredStatePurger.Expireable
         void acknowledgeReconcile(Log2OffsetsMap<?> augmentingOffsets)
         {
             logger.trace("Reconciliation completed for {}, missing {}", 
readId, augmentingOffsets);
-            Stage.READ.submit(() -> { read.augment(augmentingOffsets); 
complete(); });
+
+            Stage.READ.submit(() -> { 
+
+                try
+                {
+                    read.augment(augmentingOffsets);
+                    complete();
+                } catch (Throwable t) {
+                    logger.error("Exception thrown during read completion", t);
+                    promise.tryFailure(t);
+                    throw t;
+                }
+            });
         }
 
         private void complete()
diff --git a/test/unit/org/apache/cassandra/replication/OffsetsTest.java 
b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
index 5d10ae5e45..61c0a9ba5f 100644
--- a/test/unit/org/apache/cassandra/replication/OffsetsTest.java
+++ b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
@@ -838,4 +838,26 @@ public class OffsetsTest
         from.remove(toRemove);
         assertOffsetsEqual(expectedAfter, from);
     }
+
+    @Test
+    public void testRemoveWithExactSizedArray()
+    {
+        {
+            Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[] 
{10, 11}, 2);
+            offsets.remove(10);
+            assertOffsetsEqual(offsets(11, 11), offsets);
+        }
+
+        {
+            Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[] 
{10, 11}, 2);
+            offsets.remove(11);
+            assertOffsetsEqual(offsets(10, 10), offsets);
+        }
+
+        {
+            Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[] 
{11, 11}, 2);
+            offsets.remove(11);
+            assertOffsetsEqual(offsets(), offsets);
+        }
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java 
b/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java
new file mode 100644
index 0000000000..dfa025a327
--- /dev/null
+++ b/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class UnreconciledMutationsTest
+{
+    private static final String KEYSPACE = "ks";
+    private static final String TABLE = "tbl";
+
+    @BeforeClass
+    public static void setUp() throws IOException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(3),
+                                    TableMetadata.builder(KEYSPACE, TABLE)
+                                                 .addPartitionKeyColumn("k", 
Int32Type.instance)
+                                                 .addRegularColumn("v", 
Int32Type.instance)
+                                                 .build());
+    }
+
+    private static Token tokenFor(int key)
+    {
+        return new 
ByteOrderedPartitioner.BytesToken(ByteBufferUtil.bytes(key));
+    }
+
+    private static Mutation createMutation(int partitionKey, int value, int 
offset)
+    {
+        TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE, 
TABLE);
+        
+        // Create a MutationId with logId 1L and sequenceId that produces the 
desired offset
+        long logId = 1L;
+        long sequenceId = ((long) offset << 32) | 1000; // offset in high 
bits, timestamp in low bits
+        MutationId mutationId = new MutationId(logId, sequenceId);
+        
+        Mutation mutation = new RowUpdateBuilder(metadata, 0, partitionKey)
+                           .add("v", value)
+                           .build()
+                           .withMutationId(mutationId);
+        return mutation;
+    }
+
+    private static void addToOffsets(Offsets.OffsetReciever receiver, int... 
offsets)
+    {
+        for (int offset : offsets)
+            receiver.add(offset);
+    }
+
+    @Test
+    public void testSingleTokenCollectionIsolation()
+    {
+        UnreconciledMutations unreconciled = new UnreconciledMutations();
+        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
+        
+        // Create mutations for different partition keys with different tokens
+        Mutation mutation1 = createMutation(100, 1000, 1);
+        Mutation mutation2 = createMutation(200, 2000, 2);
+        Mutation mutation3 = createMutation(300, 3000, 3);
+        
+        Token token1 = mutation1.key().getToken();
+        Token token2 = mutation2.key().getToken();
+        Token token3 = mutation3.key().getToken();
+        
+        // Add all mutations to unreconciled state
+        unreconciled.startWriting(mutation1);
+        unreconciled.startWriting(mutation2);
+        unreconciled.startWriting(mutation3);
+        
+        // Make them visible
+        unreconciled.finishWriting(mutation1);
+        unreconciled.finishWriting(mutation2);
+        unreconciled.finishWriting(mutation3);
+        
+        // Test single token collection for token1 - should ONLY return 
mutation1
+        Offsets.Mutable offsets1 = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
+        boolean found1 = unreconciled.collect(token1, tableId, false, 
offsets1);
+        
+        Assert.assertTrue("Should find mutations for token1", found1);
+        Assert.assertEquals("Should only contain 1 offset for token1", 1, 
offsets1.offsetCount());
+        Assert.assertTrue("Should contain offset 1 for mutation1", 
offsets1.contains(1));
+        Assert.assertFalse("Should NOT contain offset 2 for mutation2", 
offsets1.contains(2));
+        Assert.assertFalse("Should NOT contain offset 3 for mutation3", 
offsets1.contains(3));
+        
+        // Test single token collection for token2 - should ONLY return 
mutation2
+        Offsets.Mutable offsets2 = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
+        boolean found2 = unreconciled.collect(token2, tableId, false, 
offsets2);
+        
+        Assert.assertTrue("Should find mutations for token2", found2);
+        Assert.assertEquals("Should only contain 1 offset for token2", 1, 
offsets2.offsetCount());
+        Assert.assertTrue("Should contain offset 2 for mutation2", 
offsets2.contains(2));
+        Assert.assertFalse("Should NOT contain offset 1 for mutation1", 
offsets2.contains(1));
+        Assert.assertFalse("Should NOT contain offset 3 for mutation3", 
offsets2.contains(3));
+        
+        // Test single token collection for token3 - should ONLY return 
mutation3
+        Offsets.Mutable offsets3 = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
+        boolean found3 = unreconciled.collect(token3, tableId, false, 
offsets3);
+        
+        Assert.assertTrue("Should find mutations for token3", found3);
+        Assert.assertEquals("Should only contain 1 offset for token3", 1, 
offsets3.offsetCount());
+        Assert.assertTrue("Should contain offset 3 for mutation3", 
offsets3.contains(3));
+        Assert.assertFalse("Should NOT contain offset 1 for mutation1", 
offsets3.contains(1));
+        Assert.assertFalse("Should NOT contain offset 2 for mutation2", 
offsets3.contains(2));
+    }
+
+    @Test
+    public void testEmptyCollection()
+    {
+        UnreconciledMutations unreconciled = new UnreconciledMutations();
+        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
+        Token token = tokenFor(100);
+        
+        Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
+        boolean found = unreconciled.collect(token, tableId, false, offsets);
+        
+        Assert.assertFalse("Should not find any mutations in empty 
collection", found);
+        Assert.assertTrue("Should have no offsets", offsets.isEmpty());
+    }
+
+    @Test
+    public void testPendingVsVisibleMutations()
+    {
+        UnreconciledMutations unreconciled = new UnreconciledMutations();
+        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
+        
+        Mutation pendingMutation = createMutation(100, 1000, 1);
+        Mutation visibleMutation = createMutation(100, 2000, 2);
+        Token token = pendingMutation.key().getToken();
+        
+        // Add one pending, one visible
+        unreconciled.startWriting(pendingMutation);
+        unreconciled.startWriting(visibleMutation);
+        unreconciled.finishWriting(visibleMutation); // Only make the second 
visible
+        
+        // Test without including pending - should only get visible mutation
+        Offsets.Mutable visibleOnly = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
+        boolean foundVisible = unreconciled.collect(token, tableId, false, 
visibleOnly);
+        
+        Assert.assertTrue("Should find visible mutations", foundVisible);
+        Assert.assertEquals("Should only have 1 visible mutation", 1, 
visibleOnly.offsetCount());
+        Assert.assertTrue("Should contain visible mutation offset", 
visibleOnly.contains(2));
+        Assert.assertFalse("Should NOT contain pending mutation offset", 
visibleOnly.contains(1));
+        
+        // Test including pending - should get both mutations
+        Offsets.Mutable includingPending = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
+        boolean foundAll = unreconciled.collect(token, tableId, true, 
includingPending);
+        
+        Assert.assertTrue("Should find all mutations", foundAll);
+        Assert.assertEquals("Should have 2 mutations total", 2, 
includingPending.offsetCount());
+        Assert.assertTrue("Should contain pending mutation offset", 
includingPending.contains(1));
+        Assert.assertTrue("Should contain visible mutation offset", 
includingPending.contains(2));
+    }
+
+    @Test
+    public void testMultipleMutationsSameToken()
+    {
+        UnreconciledMutations unreconciled = new UnreconciledMutations();
+        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
+        
+        // Create multiple mutations for the same partition key (same token)
+        Mutation mutation1 = createMutation(100, 1000, 1);
+        Mutation mutation2 = createMutation(100, 2000, 2);
+        Mutation mutation3 = createMutation(100, 3000, 3);
+        Token token = mutation1.key().getToken();
+        
+        unreconciled.startWriting(mutation1);
+        unreconciled.startWriting(mutation2);
+        unreconciled.startWriting(mutation3);
+        
+        unreconciled.finishWriting(mutation1);
+        unreconciled.finishWriting(mutation2);
+        unreconciled.finishWriting(mutation3);
+        
+        Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
+        boolean found = unreconciled.collect(token, tableId, false, offsets);
+        
+        Assert.assertTrue("Should find mutations for token", found);
+        Assert.assertEquals("Should have all 3 mutations for same token", 3, 
offsets.offsetCount());
+        Assert.assertTrue("Should contain offset 1", offsets.contains(1));
+        Assert.assertTrue("Should contain offset 2", offsets.contains(2));
+        Assert.assertTrue("Should contain offset 3", offsets.contains(3));
+    }
+
+    @Test
+    public void testTableIdFiltering()
+    {
+        UnreconciledMutations unreconciled = new UnreconciledMutations();
+        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
+        
+        // Create a fake different table ID
+        TableId differentTableId = TableId.generate();
+        
+        Mutation mutation = createMutation(100, 1000, 1);
+        Token token = mutation.key().getToken();
+        
+        unreconciled.startWriting(mutation);
+        unreconciled.finishWriting(mutation);
+        
+        // Query with correct table ID - should find mutation
+        Offsets.Mutable correctTable = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
+        boolean foundCorrect = unreconciled.collect(token, tableId, false, 
correctTable);
+        
+        Assert.assertTrue("Should find mutation for correct table", 
foundCorrect);
+        Assert.assertEquals("Should have 1 mutation", 1, 
correctTable.offsetCount());
+        
+        // Query with different table ID - should find nothing
+        Offsets.Mutable differentTable = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
+        boolean foundDifferent = unreconciled.collect(token, differentTableId, 
false, differentTable);
+        
+        Assert.assertFalse("Should not find mutation for different table", 
foundDifferent);
+        Assert.assertTrue("Should have no mutations", 
differentTable.isEmpty());
+    }
+
+    @Test
+    public void testMutationRemoval()
+    {
+        UnreconciledMutations unreconciled = new UnreconciledMutations();
+        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
+        
+        Mutation mutation = createMutation(100, 1000, 1);
+        Token token = mutation.key().getToken();
+        
+        unreconciled.startWriting(mutation);
+        unreconciled.finishWriting(mutation);
+        
+        // Verify mutation is present
+        Offsets.Mutable beforeRemoval = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
+        boolean foundBefore = unreconciled.collect(token, tableId, false, 
beforeRemoval);
+        Assert.assertTrue("Should find mutation before removal", foundBefore);
+        Assert.assertEquals("Should have 1 mutation before removal", 1, 
beforeRemoval.offsetCount());
+        
+        // Remove the mutation
+        unreconciled.remove(1);
+        
+        // Verify mutation is gone
+        Offsets.Mutable afterRemoval = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
+        boolean foundAfter = unreconciled.collect(token, tableId, false, 
afterRemoval);
+        Assert.assertFalse("Should not find mutation after removal", 
foundAfter);
+        Assert.assertTrue("Should have no mutations after removal", 
afterRemoval.isEmpty());
+    }
+
+    @Test
+    public void testTokenRangeCollection()
+    {
+        UnreconciledMutations unreconciled = new UnreconciledMutations();
+        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
+        
+        // Create mutations with different tokens and sort them
+        List<Integer> keys = List.of(100, 200, 300, 400, 500);
+        List<Mutation> mutations = new ArrayList<>();
+        for (int i = 0; i < keys.size(); i++)
+        {
+            Mutation mutation = createMutation(keys.get(i), keys.get(i) * 10, 
i + 1);
+            mutations.add(mutation);
+            unreconciled.startWriting(mutation);
+            unreconciled.finishWriting(mutation);
+        }
+        
+        // Sort mutations by token for predictable range testing
+        mutations.sort((m1, m2) -> 
m1.key().getToken().compareTo(m2.key().getToken()));
+        
+        Token firstToken = mutations.get(0).key().getToken();
+        Token middleToken = mutations.get(2).key().getToken();
+        Token lastToken = mutations.get(4).key().getToken();
+        
+        // Test range from first to middle (inclusive)
+        DecoratedKey startKey = mutations.get(0).key();
+        DecoratedKey endKey = mutations.get(2).key();
+        Range<Token> range = 
+            new Range<>(firstToken, middleToken);
+        AbstractBounds<PartitionPosition> bounds = 
+            new Bounds<>(startKey, endKey);
+        
+        Offsets.Mutable rangeOffsets = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
+        boolean foundRange = unreconciled.collect(bounds, tableId, false, 
rangeOffsets);
+        
+        Assert.assertTrue("Should find mutations in range", foundRange);
+        // Should include mutations at positions 0, 1, and 2 (first through 
middle inclusive)
+        Assert.assertTrue("Range should be inclusive of boundaries", 
rangeOffsets.offsetCount() >= 3);
+    }
+
+    @Test
+    public void testSingleTokenRangeCollectionBug()
+    {
+        UnreconciledMutations unreconciled = new UnreconciledMutations();
+        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
+        
+        // Create mutations for different partition keys with different tokens
+        Mutation mutation1 = createMutation(100, 1000, 1);
+        Mutation mutation2 = createMutation(200, 2000, 2);
+        Mutation mutation3 = createMutation(300, 3000, 3);
+        
+        // Add all mutations to unreconciled state
+        unreconciled.startWriting(mutation1);
+        unreconciled.startWriting(mutation2);
+        unreconciled.startWriting(mutation3);
+        
+        // Make them visible
+        unreconciled.finishWriting(mutation1);
+        unreconciled.finishWriting(mutation2);
+        unreconciled.finishWriting(mutation3);
+        
+        // Test single token range collection
+        Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
+        Bounds<PartitionPosition> bounds = new Bounds<>(mutation1.key(), 
mutation1.key());
+        boolean found = unreconciled.collect(bounds, tableId, false, offsets);
+        
+        // This should only return mutation1
+        Assert.assertTrue("Should find mutations for single token range", 
found);
+        Assert.assertEquals("Single token range should only return 1 
mutation", 1, offsets.offsetCount());
+        Assert.assertTrue("Should contain mutation1 offset", 
offsets.contains(1));
+
+        // other mutations should not be included
+        Assert.assertFalse("Single token range collection should NOT contain 
mutation2", offsets.contains(2));
+        Assert.assertFalse("Single token range collection should NOT contain 
mutation3", offsets.contains(3));
+    }
+
+    @Test
+    public void testFullRangeCollectionWithMinimumToken()
+    {
+        UnreconciledMutations unreconciled = new UnreconciledMutations();
+        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
+        
+        // Create mutations for different partition keys with different tokens
+        Mutation mutation1 = createMutation(100, 1000, 1);
+        Mutation mutation2 = createMutation(200, 2000, 2);
+        Mutation mutation3 = createMutation(300, 3000, 3);
+        
+        // Add all mutations to unreconciled state
+        unreconciled.startWriting(mutation1);
+        unreconciled.startWriting(mutation2);
+        unreconciled.startWriting(mutation3);
+        
+        // Make them visible
+        unreconciled.finishWriting(mutation1);
+        unreconciled.finishWriting(mutation2);
+        unreconciled.finishWriting(mutation3);
+        
+        // Create a full-range query using minimum token pattern (same 
start/end with minimum token)
+        // This is the legitimate case where cmp == 0 should mean "full range"
+        Token minimumToken = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+        Range<PartitionPosition> fullRange = new 
Range<>(minimumToken.minKeyBound(), minimumToken.minKeyBound());
+
+        // Test full range collection - this SHOULD return ALL mutations
+        Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1, 
1));
+        boolean found = unreconciled.collect(fullRange, tableId, false, 
offsets);
+        
+        // Full range should return ALL mutations
+        Assert.assertTrue("Should find mutations for full range", found);
+        Assert.assertEquals("Full range should return ALL mutations", 3, 
offsets.offsetCount());
+        Assert.assertTrue("Should contain mutation1 offset", 
offsets.contains(1));
+        Assert.assertTrue("Should contain mutation2 offset", 
offsets.contains(2));
+        Assert.assertTrue("Should contain mutation3 offset", 
offsets.contains(3));
+    }
+
+    @Test
+    public void testSingleTokenCollectionVsFullRange()
+    {
+        UnreconciledMutations unreconciled = new UnreconciledMutations();
+        TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id;
+        
+        // Create mutations with different tokens
+        Mutation mutation1 = createMutation(100, 1000, 1);
+        Mutation mutation2 = createMutation(200, 2000, 2);
+        Mutation mutation3 = createMutation(300, 3000, 3);
+        
+        Token targetToken = mutation1.key().getToken();
+        
+        unreconciled.startWriting(mutation1);
+        unreconciled.startWriting(mutation2);
+        unreconciled.startWriting(mutation3);
+        
+        unreconciled.finishWriting(mutation1);
+        unreconciled.finishWriting(mutation2);
+        unreconciled.finishWriting(mutation3);
+        
+        // Single token collection should only return mutation1
+        Offsets.Mutable singleToken = new Offsets.Mutable(new 
CoordinatorLogId(1, 1));
+        boolean foundSingle = unreconciled.collect(targetToken, tableId, 
false, singleToken);
+        
+        Assert.assertTrue("Should find mutation for single token", 
foundSingle);
+        Assert.assertEquals("Single token should only return 1 mutation", 1, 
singleToken.offsetCount());
+        Assert.assertTrue("Should contain mutation1 offset", 
singleToken.contains(1));
+        
+        // This is the CRITICAL test - single token collection must NOT return 
all mutations
+        // If the bug exists, this test will fail because it returns all 3 
mutations instead of 1
+        Assert.assertFalse("Single token collection should NOT contain 
mutation2", singleToken.contains(2));
+        Assert.assertFalse("Single token collection should NOT contain 
mutation3", singleToken.contains(3));
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to