This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cep-45-mutation-tracking in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 8b7753f7ecbb0234b19ea1972ca8ac4574b06678 Author: Blake Eggleston <[email protected]> AuthorDate: Tue Sep 2 12:35:59 2025 -0700 Single-partition read fixes: - filter augmenting mutations and add comment explaining why we don't exclude reconciled mutations when augmenting - propagate exceptions during acknowledgeReconcile - fix UnreconciledMutations handling of single key bounds - fix single offset remove bug patch by Blake Eggleston; reviewed by Caleb Rackliffe for CASSANDRA-20830 --- .../cassandra/replication/Log2OffsetsMap.java | 29 ++ .../org/apache/cassandra/replication/Offsets.java | 6 +- .../replication/UnreconciledMutations.java | 15 +- .../service/reads/tracked/PartialTrackedRead.java | 9 + .../tracked/PartialTrackedSinglePartitionRead.java | 8 +- .../service/reads/tracked/ReadReconciliations.java | 14 + .../service/reads/tracked/TrackedLocalReads.java | 14 +- .../apache/cassandra/replication/OffsetsTest.java | 22 ++ .../replication/UnreconciledMutationsTest.java | 429 +++++++++++++++++++++ 9 files changed, 538 insertions(+), 8 deletions(-) diff --git a/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java b/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java index f7b03b15c5..8e3b2e1bc1 100644 --- a/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java +++ b/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java @@ -21,6 +21,7 @@ package org.apache.cassandra.replication; import java.io.IOException; import java.util.Collection; import java.util.Iterator; +import java.util.Map; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; @@ -55,6 +56,34 @@ public abstract class Log2OffsetsMap<T extends Offsets> implements Iterable<Shor return count; } + @Override + public String toString() + { + + StringBuilder builder = new StringBuilder("Log2OffsetsMap{"); + boolean isFirst = true; + for (Map.Entry<Long, T> entry : asMap().entrySet()) + { + if (!isFirst) + builder.append(", "); + + CoordinatorLogId logId = new CoordinatorLogId(entry.getKey()); + T offsets = entry.getValue(); + builder.append(logId); + builder.append("->"); + builder.append(offsets); + isFirst = false; + } + for (T offsets : offsets()) + { + if (!isFirst) + builder.append(", "); + builder.append(offsets); + isFirst = false; + } + return builder.append('}').toString(); + } + public boolean isEmpty() { return Iterables.all(offsets(), Offsets::isEmpty); diff --git a/src/java/org/apache/cassandra/replication/Offsets.java b/src/java/org/apache/cassandra/replication/Offsets.java index bd7204fc8a..fc080452a7 100644 --- a/src/java/org/apache/cassandra/replication/Offsets.java +++ b/src/java/org/apache/cassandra/replication/Offsets.java @@ -354,9 +354,11 @@ public abstract class Offsets implements Iterable<ShortMutationId> if (pos - 1 >= 0 && bounds[pos - 1] == offset) pos--; // normalize pos to always point to floor if this is a single-offset range - if (bounds[pos] == bounds[pos + 1]) // remove entire single-offset range + int rangeStart = pos - (pos % 2); + + if (bounds[rangeStart] == bounds[rangeStart + 1]) // remove entire single-offset range { - System.arraycopy(bounds, pos + 2, bounds, pos, size - pos - 2); + System.arraycopy(bounds, rangeStart + 2, bounds, rangeStart, size - rangeStart - 2); bounds[--size] = 0; bounds[--size] = 0; } diff --git a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java index 740882b739..5a6e9d1f55 100644 --- a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java +++ b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java @@ -170,8 +170,19 @@ public class UnreconciledMutations int cmp = start.token.compareTo(end.token); if (cmp == 0) { - // full range - return collect(statesSet, tableId, includePending, into); + // When start and end tokens are equal, check if this is a single-token range + // Single-token ranges have start inclusive (offset=0) and end inclusive (offset=MAX_VALUE) + if (start.offset == 0 && end.offset == Integer.MAX_VALUE) + { + // Single token range - collect only mutations for this specific token + SortedSet<Entry> subset = statesSet.subSet(Entry.start(start.token, true), Entry.end(end.token, true)); + return collect(subset, tableId, includePending, into); + } + else + { + // Full range - collect all mutations + return collect(statesSet, tableId, includePending, into); + } } else if (cmp > 0) { diff --git a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java index cd3b137b6f..c4db7ce259 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java @@ -35,9 +35,13 @@ import org.apache.cassandra.replication.Log2OffsetsMap; import org.apache.cassandra.replication.MutationJournal; import org.apache.cassandra.replication.ShortMutationId; import org.apache.cassandra.transport.Dispatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public interface PartialTrackedRead { + Logger logger = LoggerFactory.getLogger(PartialTrackedRead.class); + interface CompletedRead extends AutoCloseable { TrackedDataResponse response(); // must be called from the read stage @@ -95,6 +99,11 @@ public interface PartialTrackedRead { Mutation mutation = MutationJournal.instance.read(mutationId); Preconditions.checkNotNull(mutation); + if (!command().selectsKey(mutation.key())) + { + logger.trace("Skipping mutation {} - {} not in read range", mutationId, mutation.key()); + return; + } augment(mutation); } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java index 368a4f0bfd..895706f056 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java @@ -20,8 +20,6 @@ package org.apache.cassandra.service.reads.tracked; import java.util.List; -import com.google.common.base.Preconditions; - import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadExecutionController; @@ -78,7 +76,11 @@ public class PartialTrackedSinglePartitionRead extends AbstractPartialTrackedRea @Override public State augment(PartitionUpdate update) { - Preconditions.checkArgument(update.partitionKey().equals(command.partitionKey())); + if (!update.partitionKey().equals(command.partitionKey())) + throw new IllegalArgumentException(String.format("Received update for partition key %s but command was for %s", + update.partitionKey(), + command.partitionKey())); + if (augmentedData == null) augmentedData = new SimpleBTreePartition(command.partitionKey(), command.metadata(), UpdateTransaction.NO_OP); diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java index 17851a24e3..1c57781c00 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java @@ -263,6 +263,20 @@ public class ReadReconciliations implements ExpiredStatePurger.Expireable /** * Remote summaries minus data node summary offsets + * + * This calculation combines BOTH reconciled and unreconciled mutations reported by other nodes, and + * then subtracts mutations reported locally for correctness + * + * If we subtracted reconciled ids from the unreconciled ids, we could violate read monotonicity in this scenario: + * 1. Read starts locally and doesn't see mutation M. + * 2. During reconciliation, mutation M arrives and is marked reconciled, other replicas report mutation M as reconciled + * 3. If we filtered out reconciled mutations, this read wouldn't augment with M + * 4. A concurrent read could see M in its initial data + * 5. This read returns without M + * + * Instead, we include all mutations and rely on token range filtering during actual mutation + * retrieval (in PartialTrackedRead.augment) to ensure we only augment with mutations + * relevant to this read's range/key */ private Log2OffsetsMap.Mutable augmentingOffsets() { diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java index ab705a6346..066ebc22e0 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java @@ -222,7 +222,19 @@ public class TrackedLocalReads implements ExpiredStatePurger.Expireable void acknowledgeReconcile(Log2OffsetsMap<?> augmentingOffsets) { logger.trace("Reconciliation completed for {}, missing {}", readId, augmentingOffsets); - Stage.READ.submit(() -> { read.augment(augmentingOffsets); complete(); }); + + Stage.READ.submit(() -> { + + try + { + read.augment(augmentingOffsets); + complete(); + } catch (Throwable t) { + logger.error("Exception thrown during read completion", t); + promise.tryFailure(t); + throw t; + } + }); } private void complete() diff --git a/test/unit/org/apache/cassandra/replication/OffsetsTest.java b/test/unit/org/apache/cassandra/replication/OffsetsTest.java index 5d10ae5e45..61c0a9ba5f 100644 --- a/test/unit/org/apache/cassandra/replication/OffsetsTest.java +++ b/test/unit/org/apache/cassandra/replication/OffsetsTest.java @@ -838,4 +838,26 @@ public class OffsetsTest from.remove(toRemove); assertOffsetsEqual(expectedAfter, from); } + + @Test + public void testRemoveWithExactSizedArray() + { + { + Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[] {10, 11}, 2); + offsets.remove(10); + assertOffsetsEqual(offsets(11, 11), offsets); + } + + { + Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[] {10, 11}, 2); + offsets.remove(11); + assertOffsetsEqual(offsets(10, 10), offsets); + } + + { + Offsets.Mutable offsets = new Offsets.Mutable(LOG_ID, new int[] {11, 11}, 2); + offsets.remove(11); + assertOffsetsEqual(offsets(), offsets); + } + } } diff --git a/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java b/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java new file mode 100644 index 0000000000..dfa025a327 --- /dev/null +++ b/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class UnreconciledMutationsTest +{ + private static final String KEYSPACE = "ks"; + private static final String TABLE = "tbl"; + + @BeforeClass + public static void setUp() throws IOException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(3), + TableMetadata.builder(KEYSPACE, TABLE) + .addPartitionKeyColumn("k", Int32Type.instance) + .addRegularColumn("v", Int32Type.instance) + .build()); + } + + private static Token tokenFor(int key) + { + return new ByteOrderedPartitioner.BytesToken(ByteBufferUtil.bytes(key)); + } + + private static Mutation createMutation(int partitionKey, int value, int offset) + { + TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE); + + // Create a MutationId with logId 1L and sequenceId that produces the desired offset + long logId = 1L; + long sequenceId = ((long) offset << 32) | 1000; // offset in high bits, timestamp in low bits + MutationId mutationId = new MutationId(logId, sequenceId); + + Mutation mutation = new RowUpdateBuilder(metadata, 0, partitionKey) + .add("v", value) + .build() + .withMutationId(mutationId); + return mutation; + } + + private static void addToOffsets(Offsets.OffsetReciever receiver, int... offsets) + { + for (int offset : offsets) + receiver.add(offset); + } + + @Test + public void testSingleTokenCollectionIsolation() + { + UnreconciledMutations unreconciled = new UnreconciledMutations(); + TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id; + + // Create mutations for different partition keys with different tokens + Mutation mutation1 = createMutation(100, 1000, 1); + Mutation mutation2 = createMutation(200, 2000, 2); + Mutation mutation3 = createMutation(300, 3000, 3); + + Token token1 = mutation1.key().getToken(); + Token token2 = mutation2.key().getToken(); + Token token3 = mutation3.key().getToken(); + + // Add all mutations to unreconciled state + unreconciled.startWriting(mutation1); + unreconciled.startWriting(mutation2); + unreconciled.startWriting(mutation3); + + // Make them visible + unreconciled.finishWriting(mutation1); + unreconciled.finishWriting(mutation2); + unreconciled.finishWriting(mutation3); + + // Test single token collection for token1 - should ONLY return mutation1 + Offsets.Mutable offsets1 = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean found1 = unreconciled.collect(token1, tableId, false, offsets1); + + Assert.assertTrue("Should find mutations for token1", found1); + Assert.assertEquals("Should only contain 1 offset for token1", 1, offsets1.offsetCount()); + Assert.assertTrue("Should contain offset 1 for mutation1", offsets1.contains(1)); + Assert.assertFalse("Should NOT contain offset 2 for mutation2", offsets1.contains(2)); + Assert.assertFalse("Should NOT contain offset 3 for mutation3", offsets1.contains(3)); + + // Test single token collection for token2 - should ONLY return mutation2 + Offsets.Mutable offsets2 = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean found2 = unreconciled.collect(token2, tableId, false, offsets2); + + Assert.assertTrue("Should find mutations for token2", found2); + Assert.assertEquals("Should only contain 1 offset for token2", 1, offsets2.offsetCount()); + Assert.assertTrue("Should contain offset 2 for mutation2", offsets2.contains(2)); + Assert.assertFalse("Should NOT contain offset 1 for mutation1", offsets2.contains(1)); + Assert.assertFalse("Should NOT contain offset 3 for mutation3", offsets2.contains(3)); + + // Test single token collection for token3 - should ONLY return mutation3 + Offsets.Mutable offsets3 = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean found3 = unreconciled.collect(token3, tableId, false, offsets3); + + Assert.assertTrue("Should find mutations for token3", found3); + Assert.assertEquals("Should only contain 1 offset for token3", 1, offsets3.offsetCount()); + Assert.assertTrue("Should contain offset 3 for mutation3", offsets3.contains(3)); + Assert.assertFalse("Should NOT contain offset 1 for mutation1", offsets3.contains(1)); + Assert.assertFalse("Should NOT contain offset 2 for mutation2", offsets3.contains(2)); + } + + @Test + public void testEmptyCollection() + { + UnreconciledMutations unreconciled = new UnreconciledMutations(); + TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id; + Token token = tokenFor(100); + + Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean found = unreconciled.collect(token, tableId, false, offsets); + + Assert.assertFalse("Should not find any mutations in empty collection", found); + Assert.assertTrue("Should have no offsets", offsets.isEmpty()); + } + + @Test + public void testPendingVsVisibleMutations() + { + UnreconciledMutations unreconciled = new UnreconciledMutations(); + TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id; + + Mutation pendingMutation = createMutation(100, 1000, 1); + Mutation visibleMutation = createMutation(100, 2000, 2); + Token token = pendingMutation.key().getToken(); + + // Add one pending, one visible + unreconciled.startWriting(pendingMutation); + unreconciled.startWriting(visibleMutation); + unreconciled.finishWriting(visibleMutation); // Only make the second visible + + // Test without including pending - should only get visible mutation + Offsets.Mutable visibleOnly = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean foundVisible = unreconciled.collect(token, tableId, false, visibleOnly); + + Assert.assertTrue("Should find visible mutations", foundVisible); + Assert.assertEquals("Should only have 1 visible mutation", 1, visibleOnly.offsetCount()); + Assert.assertTrue("Should contain visible mutation offset", visibleOnly.contains(2)); + Assert.assertFalse("Should NOT contain pending mutation offset", visibleOnly.contains(1)); + + // Test including pending - should get both mutations + Offsets.Mutable includingPending = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean foundAll = unreconciled.collect(token, tableId, true, includingPending); + + Assert.assertTrue("Should find all mutations", foundAll); + Assert.assertEquals("Should have 2 mutations total", 2, includingPending.offsetCount()); + Assert.assertTrue("Should contain pending mutation offset", includingPending.contains(1)); + Assert.assertTrue("Should contain visible mutation offset", includingPending.contains(2)); + } + + @Test + public void testMultipleMutationsSameToken() + { + UnreconciledMutations unreconciled = new UnreconciledMutations(); + TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id; + + // Create multiple mutations for the same partition key (same token) + Mutation mutation1 = createMutation(100, 1000, 1); + Mutation mutation2 = createMutation(100, 2000, 2); + Mutation mutation3 = createMutation(100, 3000, 3); + Token token = mutation1.key().getToken(); + + unreconciled.startWriting(mutation1); + unreconciled.startWriting(mutation2); + unreconciled.startWriting(mutation3); + + unreconciled.finishWriting(mutation1); + unreconciled.finishWriting(mutation2); + unreconciled.finishWriting(mutation3); + + Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean found = unreconciled.collect(token, tableId, false, offsets); + + Assert.assertTrue("Should find mutations for token", found); + Assert.assertEquals("Should have all 3 mutations for same token", 3, offsets.offsetCount()); + Assert.assertTrue("Should contain offset 1", offsets.contains(1)); + Assert.assertTrue("Should contain offset 2", offsets.contains(2)); + Assert.assertTrue("Should contain offset 3", offsets.contains(3)); + } + + @Test + public void testTableIdFiltering() + { + UnreconciledMutations unreconciled = new UnreconciledMutations(); + TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id; + + // Create a fake different table ID + TableId differentTableId = TableId.generate(); + + Mutation mutation = createMutation(100, 1000, 1); + Token token = mutation.key().getToken(); + + unreconciled.startWriting(mutation); + unreconciled.finishWriting(mutation); + + // Query with correct table ID - should find mutation + Offsets.Mutable correctTable = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean foundCorrect = unreconciled.collect(token, tableId, false, correctTable); + + Assert.assertTrue("Should find mutation for correct table", foundCorrect); + Assert.assertEquals("Should have 1 mutation", 1, correctTable.offsetCount()); + + // Query with different table ID - should find nothing + Offsets.Mutable differentTable = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean foundDifferent = unreconciled.collect(token, differentTableId, false, differentTable); + + Assert.assertFalse("Should not find mutation for different table", foundDifferent); + Assert.assertTrue("Should have no mutations", differentTable.isEmpty()); + } + + @Test + public void testMutationRemoval() + { + UnreconciledMutations unreconciled = new UnreconciledMutations(); + TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id; + + Mutation mutation = createMutation(100, 1000, 1); + Token token = mutation.key().getToken(); + + unreconciled.startWriting(mutation); + unreconciled.finishWriting(mutation); + + // Verify mutation is present + Offsets.Mutable beforeRemoval = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean foundBefore = unreconciled.collect(token, tableId, false, beforeRemoval); + Assert.assertTrue("Should find mutation before removal", foundBefore); + Assert.assertEquals("Should have 1 mutation before removal", 1, beforeRemoval.offsetCount()); + + // Remove the mutation + unreconciled.remove(1); + + // Verify mutation is gone + Offsets.Mutable afterRemoval = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean foundAfter = unreconciled.collect(token, tableId, false, afterRemoval); + Assert.assertFalse("Should not find mutation after removal", foundAfter); + Assert.assertTrue("Should have no mutations after removal", afterRemoval.isEmpty()); + } + + @Test + public void testTokenRangeCollection() + { + UnreconciledMutations unreconciled = new UnreconciledMutations(); + TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id; + + // Create mutations with different tokens and sort them + List<Integer> keys = List.of(100, 200, 300, 400, 500); + List<Mutation> mutations = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) + { + Mutation mutation = createMutation(keys.get(i), keys.get(i) * 10, i + 1); + mutations.add(mutation); + unreconciled.startWriting(mutation); + unreconciled.finishWriting(mutation); + } + + // Sort mutations by token for predictable range testing + mutations.sort((m1, m2) -> m1.key().getToken().compareTo(m2.key().getToken())); + + Token firstToken = mutations.get(0).key().getToken(); + Token middleToken = mutations.get(2).key().getToken(); + Token lastToken = mutations.get(4).key().getToken(); + + // Test range from first to middle (inclusive) + DecoratedKey startKey = mutations.get(0).key(); + DecoratedKey endKey = mutations.get(2).key(); + Range<Token> range = + new Range<>(firstToken, middleToken); + AbstractBounds<PartitionPosition> bounds = + new Bounds<>(startKey, endKey); + + Offsets.Mutable rangeOffsets = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean foundRange = unreconciled.collect(bounds, tableId, false, rangeOffsets); + + Assert.assertTrue("Should find mutations in range", foundRange); + // Should include mutations at positions 0, 1, and 2 (first through middle inclusive) + Assert.assertTrue("Range should be inclusive of boundaries", rangeOffsets.offsetCount() >= 3); + } + + @Test + public void testSingleTokenRangeCollectionBug() + { + UnreconciledMutations unreconciled = new UnreconciledMutations(); + TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id; + + // Create mutations for different partition keys with different tokens + Mutation mutation1 = createMutation(100, 1000, 1); + Mutation mutation2 = createMutation(200, 2000, 2); + Mutation mutation3 = createMutation(300, 3000, 3); + + // Add all mutations to unreconciled state + unreconciled.startWriting(mutation1); + unreconciled.startWriting(mutation2); + unreconciled.startWriting(mutation3); + + // Make them visible + unreconciled.finishWriting(mutation1); + unreconciled.finishWriting(mutation2); + unreconciled.finishWriting(mutation3); + + // Test single token range collection + Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + Bounds<PartitionPosition> bounds = new Bounds<>(mutation1.key(), mutation1.key()); + boolean found = unreconciled.collect(bounds, tableId, false, offsets); + + // This should only return mutation1 + Assert.assertTrue("Should find mutations for single token range", found); + Assert.assertEquals("Single token range should only return 1 mutation", 1, offsets.offsetCount()); + Assert.assertTrue("Should contain mutation1 offset", offsets.contains(1)); + + // other mutations should not be included + Assert.assertFalse("Single token range collection should NOT contain mutation2", offsets.contains(2)); + Assert.assertFalse("Single token range collection should NOT contain mutation3", offsets.contains(3)); + } + + @Test + public void testFullRangeCollectionWithMinimumToken() + { + UnreconciledMutations unreconciled = new UnreconciledMutations(); + TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id; + + // Create mutations for different partition keys with different tokens + Mutation mutation1 = createMutation(100, 1000, 1); + Mutation mutation2 = createMutation(200, 2000, 2); + Mutation mutation3 = createMutation(300, 3000, 3); + + // Add all mutations to unreconciled state + unreconciled.startWriting(mutation1); + unreconciled.startWriting(mutation2); + unreconciled.startWriting(mutation3); + + // Make them visible + unreconciled.finishWriting(mutation1); + unreconciled.finishWriting(mutation2); + unreconciled.finishWriting(mutation3); + + // Create a full-range query using minimum token pattern (same start/end with minimum token) + // This is the legitimate case where cmp == 0 should mean "full range" + Token minimumToken = DatabaseDescriptor.getPartitioner().getMinimumToken(); + Range<PartitionPosition> fullRange = new Range<>(minimumToken.minKeyBound(), minimumToken.minKeyBound()); + + // Test full range collection - this SHOULD return ALL mutations + Offsets.Mutable offsets = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean found = unreconciled.collect(fullRange, tableId, false, offsets); + + // Full range should return ALL mutations + Assert.assertTrue("Should find mutations for full range", found); + Assert.assertEquals("Full range should return ALL mutations", 3, offsets.offsetCount()); + Assert.assertTrue("Should contain mutation1 offset", offsets.contains(1)); + Assert.assertTrue("Should contain mutation2 offset", offsets.contains(2)); + Assert.assertTrue("Should contain mutation3 offset", offsets.contains(3)); + } + + @Test + public void testSingleTokenCollectionVsFullRange() + { + UnreconciledMutations unreconciled = new UnreconciledMutations(); + TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, TABLE).id; + + // Create mutations with different tokens + Mutation mutation1 = createMutation(100, 1000, 1); + Mutation mutation2 = createMutation(200, 2000, 2); + Mutation mutation3 = createMutation(300, 3000, 3); + + Token targetToken = mutation1.key().getToken(); + + unreconciled.startWriting(mutation1); + unreconciled.startWriting(mutation2); + unreconciled.startWriting(mutation3); + + unreconciled.finishWriting(mutation1); + unreconciled.finishWriting(mutation2); + unreconciled.finishWriting(mutation3); + + // Single token collection should only return mutation1 + Offsets.Mutable singleToken = new Offsets.Mutable(new CoordinatorLogId(1, 1)); + boolean foundSingle = unreconciled.collect(targetToken, tableId, false, singleToken); + + Assert.assertTrue("Should find mutation for single token", foundSingle); + Assert.assertEquals("Single token should only return 1 mutation", 1, singleToken.offsetCount()); + Assert.assertTrue("Should contain mutation1 offset", singleToken.contains(1)); + + // This is the CRITICAL test - single token collection must NOT return all mutations + // If the bug exists, this test will fail because it returns all 3 mutations instead of 1 + Assert.assertFalse("Single token collection should NOT contain mutation2", singleToken.contains(2)); + Assert.assertFalse("Single token collection should NOT contain mutation3", singleToken.contains(3)); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
