Author: slebresne Date: Wed Nov 23 07:59:45 2011 New Revision: 1205316 URL: http://svn.apache.org/viewvc?rev=1205316&view=rev Log: Fix array out of bounds error in counter shard removal patch by slebresne; reviewed by yukim for CASSANDRA-3514
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1205316&r1=1205315&r2=1205316&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Nov 23 07:59:45 2011 @@ -36,7 +36,7 @@ * CFMetaData.convertToThrift method to set RowCacheProvider (CASSANDRA-3405) * acquire compactionlock during truncate (CASSANDRA-3399) * fix displaying cfdef entries for super columnfamilies (CASSANDRA-3415) - * Make counter shard merging thread safe (CASSANDRA-3178) + * Make counter shard merging thread safe (CASSANDRA-3178, -3514) * Fix bug preventing the use of efficient cross-DC writes (CASSANDRA-3472) * (Hadoop) skip empty rows when entire row is requested, redux (CASSANDRA-2855) * fix concurrence issue in the FailureDetector (CASSANDRA-3519) Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java?rev=1205316&r1=1205315&r2=1205316&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java Wed Nov 23 07:59:45 2011 @@ -628,6 +628,7 @@ public class CounterContext implements I int hlength = headerLength(context); ContextState state = new ContextState(context, hlength); int removedShards = 0; + int removedDelta = 0; while (state.hasRemaining()) { long clock = state.getClock(); @@ -655,7 +656,11 @@ public class CounterContext implements I } if (-((int)(clock / 1000)) < gcBefore) + { removedShards++; + if (state.isDelta()) + removedDelta++; + } } state.moveToNext(); } @@ -663,9 +668,9 @@ public class CounterContext implements I if (removedShards == 0) return context; - - int removedHeaderSize = removedShards * HEADER_ELT_LENGTH; - int newSize = context.remaining() - removedHeaderSize - (removedShards * STEP_LENGTH); + int removedHeaderSize = removedDelta * HEADER_ELT_LENGTH; + int removedBodySize = removedShards * STEP_LENGTH; + int newSize = context.remaining() - removedHeaderSize - removedBodySize; int newHlength = hlength - removedHeaderSize; ByteBuffer cleanedContext = ByteBuffer.allocate(newSize); cleanedContext.putShort(cleanedContext.position(), (short) ((newHlength - HEADER_SIZE_LENGTH) / HEADER_ELT_LENGTH)); @@ -675,10 +680,11 @@ public class CounterContext implements I while (state.hasRemaining()) { long clock = state.getClock(); - if (!(clock < 0 && state.getCount() == 0)) + if (clock >= 0 || state.getCount() != 0 || -((int)(clock / 1000)) >= gcBefore) { state.copyTo(cleaned); } + state.moveToNext(); } return cleanedContext; Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java?rev=1205316&r1=1205315&r2=1205316&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java (original) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java Wed Nov 23 07:59:45 2011 @@ -381,4 +381,81 @@ public class CounterContextTest assert cc.total(ctx.context) == cc.total(cleaned); assert cleaned.remaining() == ctx.context.remaining() - stepLength - 2; } + + @Test + public void testRemoveOldShardsNotAllExpiring() + { + runRemoveOldShardsNotAllExpiring(HeapAllocator.instance); + runRemoveOldShardsNotAllExpiring(bumpedSlab()); + } + + private void runRemoveOldShardsNotAllExpiring(Allocator allocator) + { + NodeId id1 = NodeId.fromInt(1); + NodeId id3 = NodeId.fromInt(3); + NodeId id6 = NodeId.fromInt(6); + List<NodeId.NodeIdRecord> records = new ArrayList<NodeId.NodeIdRecord>(); + records.add(new NodeId.NodeIdRecord(id1, 2L)); + records.add(new NodeId.NodeIdRecord(id3, 4L)); + records.add(new NodeId.NodeIdRecord(id6, 10L)); + + ContextState ctx = ContextState.allocate(6, 3, allocator); + ctx.writeElement(id1, 0L, 1L, true); + ctx.writeElement(NodeId.fromInt(2), 0L, 2L); + ctx.writeElement(id3, 0L, 3L, true); + ctx.writeElement(NodeId.fromInt(4), 0L, 3L); + ctx.writeElement(NodeId.fromInt(5), 0L, 3L, true); + ctx.writeElement(id6, 0L, 6L); + + int timeFirstMerge = (int)(System.currentTimeMillis() / 1000); + + // First, only merge the first id + ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, 3L); + ByteBuffer merged = cc.merge(ctx.context, merger, allocator); + assert cc.total(ctx.context) == cc.total(merged); + + try + { + Thread.sleep(2000); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + + // merge the second one + ByteBuffer merger2 = cc.computeOldShardMerger(merged, records, 7L); + ByteBuffer merged2 = cc.merge(merged, merger2, allocator); + assert cc.total(ctx.context) == cc.total(merged2); + + ByteBuffer cleaned = cc.removeOldShards(merged2, timeFirstMerge + 1); + assert cc.total(ctx.context) == cc.total(cleaned); + assert cleaned.remaining() == ctx.context.remaining(); + + // We should have cleaned id1 but not id3 + ContextState m = new ContextState(cleaned); + m.moveToNext(); + assert m.getNodeId().equals(id3); + + } + + @Test + public void testRemoveNotDeltaOldShards() + { + runRemoveNotDeltaOldShards(HeapAllocator.instance); + runRemoveNotDeltaOldShards(bumpedSlab()); + } + + private void runRemoveNotDeltaOldShards(Allocator allocator) + { + ContextState ctx = ContextState.allocate(4, 1, allocator); + ctx.writeElement(NodeId.fromInt(1), 1L, 1L, true); + ctx.writeElement(NodeId.fromInt(2), -System.currentTimeMillis(), 0L); + ctx.writeElement(NodeId.fromInt(3), -System.currentTimeMillis(), 0L); + ctx.writeElement(NodeId.fromInt(4), -System.currentTimeMillis(), 0L); + + ByteBuffer cleaned = cc.removeOldShards(ctx.context, (int)(System.currentTimeMillis() / 1000) + 1); + assert cc.total(ctx.context) == cc.total(cleaned); + assert cleaned.remaining() == ctx.context.remaining() - 3 * stepLength; + } }