Updated Branches: refs/heads/trunk 7faff8db7 -> a0fb58f6f
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/test/unit/org/apache/cassandra/db/context/CounterContextTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java index 0d05340..5c88fd6 100644 --- a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java +++ b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java @@ -23,36 +23,25 @@ package org.apache.cassandra.db.context; import static org.junit.Assert.*; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; import org.junit.Test; -import org.apache.cassandra.Util; import org.apache.cassandra.db.context.IContext.ContextRelationship; -import static org.apache.cassandra.db.context.CounterContext.ContextState; +import org.apache.cassandra.Util; import org.apache.cassandra.utils.*; -import com.google.common.util.concurrent.Uninterruptibles; +import static org.apache.cassandra.db.context.CounterContext.ContextState; public class CounterContextTest { private static final CounterContext cc = new CounterContext(); - private static final int idLength; - private static final int clockLength; - private static final int countLength; - - private static final int stepLength; - - static - { - idLength = CounterId.LENGTH; // size of int - clockLength = 8; // size of long - countLength = 8; // size of long - stepLength = idLength + clockLength + countLength; - } + private static final int headerSizeLength = 2; + private static final int headerEltLength = 2; + private static final int idLength = 16; + private static final int clockLength = 8; + private static final int countLength = 8; + private static final int stepLength = idLength + clockLength + countLength; /** Allocates 1 byte from a new SlabAllocator and returns it. */ private Allocator bumpedSlab() @@ -63,16 +52,25 @@ public class CounterContextTest } @Test - public void testCreate() + public void testAllocate() { - runCreate(HeapAllocator.instance); - runCreate(bumpedSlab()); + runAllocate(HeapAllocator.instance); + runAllocate(bumpedSlab()); } - private void runCreate(Allocator allocator) + private void runAllocate(Allocator allocator) { - ByteBuffer bytes = cc.create(4, allocator); - assertEquals(stepLength + 4, bytes.remaining()); + ContextState allGlobal = ContextState.allocate(3, 0, 0, allocator); + assertEquals(headerSizeLength + 3 * headerEltLength + 3 * stepLength, allGlobal.context.remaining()); + + ContextState allLocal = ContextState.allocate(0, 3, 0, allocator); + assertEquals(headerSizeLength + 3 * headerEltLength + 3 * stepLength, allLocal.context.remaining()); + + ContextState allRemote = ContextState.allocate(0, 0, 3, allocator); + assertEquals(headerSizeLength + 3 * stepLength, allRemote.context.remaining()); + + ContextState mixed = ContextState.allocate(1, 1, 1, allocator); + assertEquals(headerSizeLength + 2 * headerEltLength + 3 * stepLength, mixed.context.remaining()); } @Test @@ -84,188 +82,176 @@ public class CounterContextTest private void runDiff(Allocator allocator) { - ContextState left = ContextState.allocate(3, 0, allocator); + ContextState left; ContextState right; // equality: equal nodes, all counts same - left.writeElement(CounterId.fromInt(3), 3L, 0L); - left.writeElement(CounterId.fromInt(6), 2L, 0L); - left.writeElement(CounterId.fromInt(9), 1L, 0L); - right = new ContextState(ByteBufferUtil.clone(left.context), left.headerLength); + left = ContextState.allocate(0, 0, 3, allocator); + left.writeRemote(CounterId.fromInt(3), 3L, 0L); + left.writeRemote(CounterId.fromInt(6), 2L, 0L); + left.writeRemote(CounterId.fromInt(9), 1L, 0L); + right = ContextState.wrap(ByteBufferUtil.clone(left.context)); - assert ContextRelationship.EQUAL == - cc.diff(left.context, right.context); + assertEquals(ContextRelationship.EQUAL, cc.diff(left.context, right.context)); // greater than: left has superset of nodes (counts equal) - left = ContextState.allocate(4, 0, allocator); - left.writeElement(CounterId.fromInt(3), 3L, 0L); - left.writeElement(CounterId.fromInt(6), 2L, 0L); - left.writeElement(CounterId.fromInt(9), 1L, 0L); - left.writeElement(CounterId.fromInt(12), 0L, 0L); + left = ContextState.allocate(0, 0, 4, allocator); + left.writeRemote(CounterId.fromInt(3), 3L, 0L); + left.writeRemote(CounterId.fromInt(6), 2L, 0L); + left.writeRemote(CounterId.fromInt(9), 1L, 0L); + left.writeRemote(CounterId.fromInt(12), 0L, 0L); - right = ContextState.allocate(3, 0, allocator); - right.writeElement(CounterId.fromInt(3), 3L, 0L); - right.writeElement(CounterId.fromInt(6), 2L, 0L); - right.writeElement(CounterId.fromInt(9), 1L, 0L); + right = ContextState.allocate(0, 0, 3, allocator); + right.writeRemote(CounterId.fromInt(3), 3L, 0L); + right.writeRemote(CounterId.fromInt(6), 2L, 0L); + right.writeRemote(CounterId.fromInt(9), 1L, 0L); - assert ContextRelationship.GREATER_THAN == - cc.diff(left.context, right.context); + assertEquals(ContextRelationship.GREATER_THAN, cc.diff(left.context, right.context)); // less than: left has subset of nodes (counts equal) - left = ContextState.allocate(3, 0, allocator); - left.writeElement(CounterId.fromInt(3), 3L, 0L); - left.writeElement(CounterId.fromInt(6), 2L, 0L); - left.writeElement(CounterId.fromInt(9), 1L, 0L); + left = ContextState.allocate(0, 0, 3, allocator); + left.writeRemote(CounterId.fromInt(3), 3L, 0L); + left.writeRemote(CounterId.fromInt(6), 2L, 0L); + left.writeRemote(CounterId.fromInt(9), 1L, 0L); - right = ContextState.allocate(4, 0, allocator); - right.writeElement(CounterId.fromInt(3), 3L, 0L); - right.writeElement(CounterId.fromInt(6), 2L, 0L); - right.writeElement(CounterId.fromInt(9), 1L, 0L); - right.writeElement(CounterId.fromInt(12), 0L, 0L); + right = ContextState.allocate(0, 0, 4, allocator); + right.writeRemote(CounterId.fromInt(3), 3L, 0L); + right.writeRemote(CounterId.fromInt(6), 2L, 0L); + right.writeRemote(CounterId.fromInt(9), 1L, 0L); + right.writeRemote(CounterId.fromInt(12), 0L, 0L); - assert ContextRelationship.LESS_THAN == - cc.diff(left.context, right.context); + assertEquals(ContextRelationship.LESS_THAN, cc.diff(left.context, right.context)); // greater than: equal nodes, but left has higher counts - left = ContextState.allocate(3, 0, allocator); - left.writeElement(CounterId.fromInt(3), 3L, 0L); - left.writeElement(CounterId.fromInt(6), 2L, 0L); - left.writeElement(CounterId.fromInt(9), 3L, 0L); + left = ContextState.allocate(0, 0, 3, allocator); + left.writeRemote(CounterId.fromInt(3), 3L, 0L); + left.writeRemote(CounterId.fromInt(6), 2L, 0L); + left.writeRemote(CounterId.fromInt(9), 3L, 0L); - right = ContextState.allocate(3, 0, allocator); - right.writeElement(CounterId.fromInt(3), 3L, 0L); - right.writeElement(CounterId.fromInt(6), 2L, 0L); - right.writeElement(CounterId.fromInt(9), 1L, 0L); + right = ContextState.allocate(0, 0, 3, allocator); + right.writeRemote(CounterId.fromInt(3), 3L, 0L); + right.writeRemote(CounterId.fromInt(6), 2L, 0L); + right.writeRemote(CounterId.fromInt(9), 1L, 0L); - assert ContextRelationship.GREATER_THAN == - cc.diff(left.context, right.context); + assertEquals(ContextRelationship.GREATER_THAN, cc.diff(left.context, right.context)); // less than: equal nodes, but right has higher counts - left = ContextState.allocate(3, 0, allocator); - left.writeElement(CounterId.fromInt(3), 3L, 0L); - left.writeElement(CounterId.fromInt(6), 2L, 0L); - left.writeElement(CounterId.fromInt(9), 3L, 0L); + left = ContextState.allocate(0, 0, 3, allocator); + left.writeRemote(CounterId.fromInt(3), 3L, 0L); + left.writeRemote(CounterId.fromInt(6), 2L, 0L); + left.writeRemote(CounterId.fromInt(9), 3L, 0L); - right = ContextState.allocate(3, 0, allocator); - right.writeElement(CounterId.fromInt(3), 3L, 0L); - right.writeElement(CounterId.fromInt(6), 9L, 0L); - right.writeElement(CounterId.fromInt(9), 3L, 0L); + right = ContextState.allocate(0, 0, 3, allocator); + right.writeRemote(CounterId.fromInt(3), 3L, 0L); + right.writeRemote(CounterId.fromInt(6), 9L, 0L); + right.writeRemote(CounterId.fromInt(9), 3L, 0L); - assert ContextRelationship.LESS_THAN == - cc.diff(left.context, right.context); + assertEquals(ContextRelationship.LESS_THAN, cc.diff(left.context, right.context)); // disjoint: right and left have disjoint node sets - left = ContextState.allocate(3, 0, allocator); - left.writeElement(CounterId.fromInt(3), 1L, 0L); - left.writeElement(CounterId.fromInt(4), 1L, 0L); - left.writeElement(CounterId.fromInt(9), 1L, 0L); + left = ContextState.allocate(0, 0, 3, allocator); + left.writeRemote(CounterId.fromInt(3), 1L, 0L); + left.writeRemote(CounterId.fromInt(4), 1L, 0L); + left.writeRemote(CounterId.fromInt(9), 1L, 0L); - right = ContextState.allocate(3, 0, allocator); - right.writeElement(CounterId.fromInt(3), 1L, 0L); - right.writeElement(CounterId.fromInt(6), 1L, 0L); - right.writeElement(CounterId.fromInt(9), 1L, 0L); + right = ContextState.allocate(0, 0, 3, allocator); + right.writeRemote(CounterId.fromInt(3), 1L, 0L); + right.writeRemote(CounterId.fromInt(6), 1L, 0L); + right.writeRemote(CounterId.fromInt(9), 1L, 0L); - assert ContextRelationship.DISJOINT == - cc.diff(left.context, right.context); + assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context)); - left = ContextState.allocate(3, 0, allocator); - left.writeElement(CounterId.fromInt(3), 1L, 0L); - left.writeElement(CounterId.fromInt(4), 1L, 0L); - left.writeElement(CounterId.fromInt(9), 1L, 0L); + left = ContextState.allocate(0, 0, 3, allocator); + left.writeRemote(CounterId.fromInt(3), 1L, 0L); + left.writeRemote(CounterId.fromInt(4), 1L, 0L); + left.writeRemote(CounterId.fromInt(9), 1L, 0L); - right = ContextState.allocate(3, 0, allocator); - right.writeElement(CounterId.fromInt(2), 1L, 0L); - right.writeElement(CounterId.fromInt(6), 1L, 0L); - right.writeElement(CounterId.fromInt(12), 1L, 0L); + right = ContextState.allocate(0, 0, 3, allocator); + right.writeRemote(CounterId.fromInt(2), 1L, 0L); + right.writeRemote(CounterId.fromInt(6), 1L, 0L); + right.writeRemote(CounterId.fromInt(12), 1L, 0L); - assert ContextRelationship.DISJOINT == - cc.diff(left.context, right.context); + assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context)); // disjoint: equal nodes, but right and left have higher counts in differing nodes - left = ContextState.allocate(3, 0, allocator); - left.writeElement(CounterId.fromInt(3), 1L, 0L); - left.writeElement(CounterId.fromInt(6), 3L, 0L); - left.writeElement(CounterId.fromInt(9), 1L, 0L); + left = ContextState.allocate(0, 0, 3, allocator); + left.writeRemote(CounterId.fromInt(3), 1L, 0L); + left.writeRemote(CounterId.fromInt(6), 3L, 0L); + left.writeRemote(CounterId.fromInt(9), 1L, 0L); - right = ContextState.allocate(3, 0, allocator); - right.writeElement(CounterId.fromInt(3), 1L, 0L); - right.writeElement(CounterId.fromInt(6), 1L, 0L); - right.writeElement(CounterId.fromInt(9), 5L, 0L); + right = ContextState.allocate(0, 0, 3, allocator); + right.writeRemote(CounterId.fromInt(3), 1L, 0L); + right.writeRemote(CounterId.fromInt(6), 1L, 0L); + right.writeRemote(CounterId.fromInt(9), 5L, 0L); - assert ContextRelationship.DISJOINT == - cc.diff(left.context, right.context); + assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context)); - left = ContextState.allocate(3, 0, allocator); - left.writeElement(CounterId.fromInt(3), 2L, 0L); - left.writeElement(CounterId.fromInt(6), 3L, 0L); - left.writeElement(CounterId.fromInt(9), 1L, 0L); + left = ContextState.allocate(0, 0, 3, allocator); + left.writeRemote(CounterId.fromInt(3), 2L, 0L); + left.writeRemote(CounterId.fromInt(6), 3L, 0L); + left.writeRemote(CounterId.fromInt(9), 1L, 0L); - right = ContextState.allocate(3, 0, allocator); - right.writeElement(CounterId.fromInt(3), 1L, 0L); - right.writeElement(CounterId.fromInt(6), 9L, 0L); - right.writeElement(CounterId.fromInt(9), 5L, 0L); + right = ContextState.allocate(0, 0, 3, allocator); + right.writeRemote(CounterId.fromInt(3), 1L, 0L); + right.writeRemote(CounterId.fromInt(6), 9L, 0L); + right.writeRemote(CounterId.fromInt(9), 5L, 0L); - assert ContextRelationship.DISJOINT == - cc.diff(left.context, right.context); + assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context)); // disjoint: left has more nodes, but lower counts - left = ContextState.allocate(4, 0, allocator); - left.writeElement(CounterId.fromInt(3), 2L, 0L); - left.writeElement(CounterId.fromInt(6), 3L, 0L); - left.writeElement(CounterId.fromInt(9), 1L, 0L); - left.writeElement(CounterId.fromInt(12), 1L, 0L); + left = ContextState.allocate(0, 0, 4, allocator); + left.writeRemote(CounterId.fromInt(3), 2L, 0L); + left.writeRemote(CounterId.fromInt(6), 3L, 0L); + left.writeRemote(CounterId.fromInt(9), 1L, 0L); + left.writeRemote(CounterId.fromInt(12), 1L, 0L); - right = ContextState.allocate(3, 0, allocator); - right.writeElement(CounterId.fromInt(3), 4L, 0L); - right.writeElement(CounterId.fromInt(6), 9L, 0L); - right.writeElement(CounterId.fromInt(9), 5L, 0L); + right = ContextState.allocate(0, 0, 3, allocator); + right.writeRemote(CounterId.fromInt(3), 4L, 0L); + right.writeRemote(CounterId.fromInt(6), 9L, 0L); + right.writeRemote(CounterId.fromInt(9), 5L, 0L); - assert ContextRelationship.DISJOINT == - cc.diff(left.context, right.context); + assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context)); // disjoint: left has less nodes, but higher counts - left = ContextState.allocate(3, 0, allocator); - left.writeElement(CounterId.fromInt(3), 5L, 0L); - left.writeElement(CounterId.fromInt(6), 3L, 0L); - left.writeElement(CounterId.fromInt(9), 2L, 0L); + left = ContextState.allocate(0, 0, 3, allocator); + left.writeRemote(CounterId.fromInt(3), 5L, 0L); + left.writeRemote(CounterId.fromInt(6), 3L, 0L); + left.writeRemote(CounterId.fromInt(9), 2L, 0L); - right = ContextState.allocate(4, 0, allocator); - right.writeElement(CounterId.fromInt(3), 4L, 0L); - right.writeElement(CounterId.fromInt(6), 3L, 0L); - right.writeElement(CounterId.fromInt(9), 2L, 0L); - right.writeElement(CounterId.fromInt(12), 1L, 0L); + right = ContextState.allocate(0, 0, 4, allocator); + right.writeRemote(CounterId.fromInt(3), 4L, 0L); + right.writeRemote(CounterId.fromInt(6), 3L, 0L); + right.writeRemote(CounterId.fromInt(9), 2L, 0L); + right.writeRemote(CounterId.fromInt(12), 1L, 0L); - assert ContextRelationship.DISJOINT == - cc.diff(left.context, right.context); + assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context)); // disjoint: mixed nodes and counts - left = ContextState.allocate(3, 0, allocator); - left.writeElement(CounterId.fromInt(3), 5L, 0L); - left.writeElement(CounterId.fromInt(6), 2L, 0L); - left.writeElement(CounterId.fromInt(9), 2L, 0L); - - right = ContextState.allocate(4, 0, allocator); - right.writeElement(CounterId.fromInt(3), 4L, 0L); - right.writeElement(CounterId.fromInt(6), 3L, 0L); - right.writeElement(CounterId.fromInt(9), 2L, 0L); - right.writeElement(CounterId.fromInt(12), 1L, 0L); - - assert ContextRelationship.DISJOINT == - cc.diff(left.context, right.context); - - left = ContextState.allocate(4, 0, allocator); - left.writeElement(CounterId.fromInt(3), 5L, 0L); - left.writeElement(CounterId.fromInt(6), 2L, 0L); - left.writeElement(CounterId.fromInt(7), 2L, 0L); - left.writeElement(CounterId.fromInt(9), 2L, 0L); - - right = ContextState.allocate(3, 0, allocator); - right.writeElement(CounterId.fromInt(3), 4L, 0L); - right.writeElement(CounterId.fromInt(6), 3L, 0L); - right.writeElement(CounterId.fromInt(9), 2L, 0L); - - assert ContextRelationship.DISJOINT == - cc.diff(left.context, right.context); + left = ContextState.allocate(0, 0, 3, allocator); + left.writeRemote(CounterId.fromInt(3), 5L, 0L); + left.writeRemote(CounterId.fromInt(6), 2L, 0L); + left.writeRemote(CounterId.fromInt(9), 2L, 0L); + + right = ContextState.allocate(0, 0, 4, allocator); + right.writeRemote(CounterId.fromInt(3), 4L, 0L); + right.writeRemote(CounterId.fromInt(6), 3L, 0L); + right.writeRemote(CounterId.fromInt(9), 2L, 0L); + right.writeRemote(CounterId.fromInt(12), 1L, 0L); + + assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context)); + + left = ContextState.allocate(0, 0, 4, allocator); + left.writeRemote(CounterId.fromInt(3), 5L, 0L); + left.writeRemote(CounterId.fromInt(6), 2L, 0L); + left.writeRemote(CounterId.fromInt(7), 2L, 0L); + left.writeRemote(CounterId.fromInt(9), 2L, 0L); + + right = ContextState.allocate(0, 0, 3, allocator); + right.writeRemote(CounterId.fromInt(3), 4L, 0L); + right.writeRemote(CounterId.fromInt(6), 3L, 0L); + right.writeRemote(CounterId.fromInt(9), 2L, 0L); + + assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, right.context)); } @Test @@ -278,42 +264,121 @@ public class CounterContextTest private void runMerge(Allocator allocator) { // note: local counts aggregated; remote counts are reconciled (i.e. take max) - ContextState left = ContextState.allocate(4, 1, allocator); - left.writeElement(CounterId.fromInt(1), 1L, 1L); - left.writeElement(CounterId.fromInt(2), 2L, 2L); - left.writeElement(CounterId.fromInt(4), 6L, 3L); - left.writeElement(CounterId.getLocalId(), 7L, 3L, true); + ContextState left = ContextState.allocate(0, 1, 3, allocator); + left.writeRemote(CounterId.fromInt(1), 1L, 1L); + left.writeRemote(CounterId.fromInt(2), 2L, 2L); + left.writeRemote(CounterId.fromInt(4), 6L, 3L); + left.writeLocal(CounterId.getLocalId(), 7L, 3L); - ContextState right = ContextState.allocate(3, 1, allocator); - right.writeElement(CounterId.fromInt(4), 4L, 4L); - right.writeElement(CounterId.fromInt(5), 5L, 5L); - right.writeElement(CounterId.getLocalId(), 2L, 9L, true); + ContextState right = ContextState.allocate(0, 1, 2, allocator); + right.writeRemote(CounterId.fromInt(4), 4L, 4L); + right.writeRemote(CounterId.fromInt(5), 5L, 5L); + right.writeLocal(CounterId.getLocalId(), 2L, 9L); ByteBuffer merged = cc.merge(left.context, right.context, allocator); int hd = 4; assertEquals(hd + 5 * stepLength, merged.remaining()); // local node id's counts are aggregated - assert Util.equalsCounterId(CounterId.getLocalId(), merged, hd + 4 * stepLength); - assertEquals( 9L, merged.getLong(merged.position() + hd + 4*stepLength + idLength)); + assertTrue(Util.equalsCounterId(CounterId.getLocalId(), merged, hd + 4 * stepLength)); + assertEquals(9L, merged.getLong(merged.position() + hd + 4 * stepLength + idLength)); assertEquals(12L, merged.getLong(merged.position() + hd + 4*stepLength + idLength + clockLength)); // remote node id counts are reconciled (i.e. take max) - assert Util.equalsCounterId(CounterId.fromInt(4), merged, hd + 2 * stepLength); - assertEquals( 6L, merged.getLong(merged.position() + hd + 2*stepLength + idLength)); + assertTrue(Util.equalsCounterId(CounterId.fromInt(4), merged, hd + 2 * stepLength)); + assertEquals(6L, merged.getLong(merged.position() + hd + 2 * stepLength + idLength)); assertEquals( 3L, merged.getLong(merged.position() + hd + 2*stepLength + idLength + clockLength)); - assert Util.equalsCounterId(CounterId.fromInt(5), merged, hd + 3 * stepLength); - assertEquals( 5L, merged.getLong(merged.position() + hd + 3*stepLength + idLength)); + assertTrue(Util.equalsCounterId(CounterId.fromInt(5), merged, hd + 3 * stepLength)); + assertEquals(5L, merged.getLong(merged.position() + hd + 3 * stepLength + idLength)); assertEquals( 5L, merged.getLong(merged.position() + hd + 3*stepLength + idLength + clockLength)); - assert Util.equalsCounterId(CounterId.fromInt(2), merged, hd + 1 * stepLength); - assertEquals( 2L, merged.getLong(merged.position() + hd + 1*stepLength + idLength)); - assertEquals( 2L, merged.getLong(merged.position() + hd + 1*stepLength + idLength + clockLength)); - - assert Util.equalsCounterId(CounterId.fromInt(1), merged, hd + 0 * stepLength); - assertEquals( 1L, merged.getLong(merged.position() + hd + 0*stepLength + idLength)); - assertEquals( 1L, merged.getLong(merged.position() + hd + 0*stepLength + idLength + clockLength)); + assertTrue(Util.equalsCounterId(CounterId.fromInt(2), merged, hd + stepLength)); + assertEquals(2L, merged.getLong(merged.position() + hd + stepLength + idLength)); + assertEquals( 2L, merged.getLong(merged.position() + hd + stepLength + idLength + clockLength)); + + assertTrue(Util.equalsCounterId(CounterId.fromInt(1), merged, hd)); + assertEquals( 1L, merged.getLong(merged.position() + hd + idLength)); + assertEquals( 1L, merged.getLong(merged.position() + hd + idLength + clockLength)); + + // + // Test merging two exclusively global contexts + // + left = ContextState.allocate(3, 0, 0, allocator); + left.writeGlobal(CounterId.fromInt(1), 1L, 1L); + left.writeGlobal(CounterId.fromInt(2), 2L, 2L); + left.writeGlobal(CounterId.fromInt(3), 3L, 3L); + + right = ContextState.allocate(3, 0, 0, allocator); + right.writeGlobal(CounterId.fromInt(3), 6L, 6L); + right.writeGlobal(CounterId.fromInt(4), 4L, 4L); + right.writeGlobal(CounterId.fromInt(5), 5L, 5L); + + merged = cc.merge(left.context, right.context, allocator); + assertEquals(headerSizeLength + 5 * headerEltLength + 5 * stepLength, merged.remaining()); + assertEquals(18L, cc.total(merged)); + assertEquals(5, merged.getShort(merged.position())); + + int headerLength = headerSizeLength + 5 * headerEltLength; + assertTrue(Util.equalsCounterId(CounterId.fromInt(1), merged, headerLength)); + assertEquals(1L, merged.getLong(merged.position() + headerLength + idLength)); + assertEquals(1L, merged.getLong(merged.position() + headerLength + idLength + clockLength)); + assertTrue(Util.equalsCounterId(CounterId.fromInt(2), merged, headerLength + stepLength)); + assertEquals(2L, merged.getLong(merged.position() + headerLength + stepLength + idLength)); + assertEquals(2L, merged.getLong(merged.position() + headerLength + stepLength + idLength + clockLength)); + // pick the global shard with the largest clock + assertTrue(Util.equalsCounterId(CounterId.fromInt(3), merged, headerLength + 2 * stepLength)); + assertEquals(6L, merged.getLong(merged.position() + headerLength + 2 * stepLength + idLength)); + assertEquals(6L, merged.getLong(merged.position() + headerLength + 2 * stepLength + idLength + clockLength)); + assertTrue(Util.equalsCounterId(CounterId.fromInt(4), merged, headerLength + 3 * stepLength)); + assertEquals(4L, merged.getLong(merged.position() + headerLength + 3 * stepLength + idLength)); + assertEquals(4L, merged.getLong(merged.position() + headerLength + 3 * stepLength + idLength + clockLength)); + assertTrue(Util.equalsCounterId(CounterId.fromInt(5), merged, headerLength + 4 * stepLength)); + assertEquals(5L, merged.getLong(merged.position() + headerLength + 4 * stepLength + idLength)); + assertEquals(5L, merged.getLong(merged.position() + headerLength + 4 * stepLength + idLength + clockLength)); + + // + // Test merging two global contexts w/ 'invalid shards' + // + left = ContextState.allocate(1, 0, 0, allocator); + left.writeGlobal(CounterId.fromInt(1), 10L, 20L); + + right = ContextState.allocate(1, 0, 0, allocator); + right.writeGlobal(CounterId.fromInt(1), 10L, 30L); + + merged = cc.merge(left.context, right.context, allocator); + headerLength = headerSizeLength + headerEltLength; + assertEquals(headerLength + stepLength, merged.remaining()); + assertEquals(30L, cc.total(merged)); + assertEquals(1, merged.getShort(merged.position())); + assertTrue(Util.equalsCounterId(CounterId.fromInt(1), merged, headerLength)); + assertEquals(10L, merged.getLong(merged.position() + headerLength + idLength)); + // with equal clock, we should pick the largest value + assertEquals(30L, merged.getLong(merged.position() + headerLength + idLength + clockLength)); + + // + // Test merging global w/ mixed contexts + // + left = ContextState.allocate(2, 0, 0, allocator); + left.writeGlobal(CounterId.fromInt(1), 1L, 1L); + left.writeGlobal(CounterId.fromInt(2), 1L, 1L); + + right = ContextState.allocate(0, 1, 1, allocator); + right.writeLocal(CounterId.fromInt(1), 100L, 100L); + right.writeRemote(CounterId.fromInt(2), 100L, 100L); + + // global shards should dominate local/remote, even with lower clock and value + merged = cc.merge(left.context, right.context, allocator); + headerLength = headerSizeLength + 2 * headerEltLength; + assertEquals(headerLength + 2 * stepLength, merged.remaining()); + assertEquals(2L, cc.total(merged)); + assertEquals(2, merged.getShort(merged.position())); + assertTrue(Util.equalsCounterId(CounterId.fromInt(1), merged, headerLength)); + assertEquals(1L, merged.getLong(merged.position() + headerLength + idLength)); + assertEquals(1L, merged.getLong(merged.position() + headerLength + idLength + clockLength)); + assertTrue(Util.equalsCounterId(CounterId.fromInt(2), merged, headerLength + stepLength)); + assertEquals(1L, merged.getLong(merged.position() + headerLength + stepLength + idLength)); + assertEquals(1L, merged.getLong(merged.position() + headerLength + stepLength + idLength + clockLength)); } @Test @@ -325,173 +390,113 @@ public class CounterContextTest private void runTotal(Allocator allocator) { - ContextState left = ContextState.allocate(4, 1, allocator); - left.writeElement(CounterId.fromInt(1), 1L, 1L); - left.writeElement(CounterId.fromInt(2), 2L, 2L); - left.writeElement(CounterId.fromInt(4), 3L, 3L); - left.writeElement(CounterId.getLocalId(), 3L, 3L, true); - - ContextState right = ContextState.allocate(3, 1, allocator); - right.writeElement(CounterId.fromInt(4), 4L, 4L); - right.writeElement(CounterId.fromInt(5), 5L, 5L); - right.writeElement(CounterId.getLocalId(), 9L, 9L, true); - - ByteBuffer merged = cc.merge(left.context, right.context, allocator); - - // 127.0.0.1: 12 (3+9) - // 0.0.0.1: 1 - // 0.0.0.2: 2 - // 0.0.0.4: 4 - // 0.0.0.5: 5 - - assertEquals(24L, cc.total(merged)); - } - - @Test - public void testMergeOldShards() - { - runMergeOldShards(HeapAllocator.instance); - runMergeOldShards(bumpedSlab()); - } - - private void runMergeOldShards(Allocator allocator) - { - long now = System.currentTimeMillis(); - CounterId id1 = CounterId.fromInt(1); - CounterId id3 = CounterId.fromInt(3); - List<CounterId.CounterIdRecord> records = new ArrayList<CounterId.CounterIdRecord>(); - records.add(new CounterId.CounterIdRecord(id1, 2L)); - records.add(new CounterId.CounterIdRecord(id3, 4L)); - - ContextState ctx = ContextState.allocate(5, 3, allocator); - ctx.writeElement(id1, 1L, 1L, true); - ctx.writeElement(CounterId.fromInt(2), 2L, 2L); - ctx.writeElement(id3, 3L, 3L, true); - ctx.writeElement(CounterId.fromInt(4), 6L, 3L); - ctx.writeElement(CounterId.fromInt(5), 7L, 3L, true); - - ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, Integer.MAX_VALUE); - - ContextState m = new ContextState(merger); - - assert m.getCounterId().equals(id1); - assert m.getClock() <= -now; - assert m.getCount() == -1L; - assert m.isDelta(); - m.moveToNext(); - assert m.getCounterId().equals(id3); - assert m.getClock() <= -now; - assert m.getCount() == -3L; - assert m.isDelta(); - m.moveToNext(); - assert m.getCounterId().equals(CounterId.getLocalId()); - assert m.getClock() == 1L; - assert m.getCount() == 4L; - assert m.isDelta(); - assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger, allocator)); + ContextState mixed = ContextState.allocate(0, 1, 4, allocator); + mixed.writeRemote(CounterId.fromInt(1), 1L, 1L); + mixed.writeRemote(CounterId.fromInt(2), 2L, 2L); + mixed.writeRemote(CounterId.fromInt(4), 4L, 4L); + mixed.writeRemote(CounterId.fromInt(5), 5L, 5L); + mixed.writeLocal(CounterId.getLocalId(), 12L, 12L); + assertEquals(24L, cc.total(mixed.context)); + + ContextState global = ContextState.allocate(3, 0, 0, allocator); + global.writeGlobal(CounterId.fromInt(1), 1L, 1L); + global.writeGlobal(CounterId.fromInt(2), 2L, 2L); + global.writeGlobal(CounterId.fromInt(3), 3L, 3L); + assertEquals(6L, cc.total(global.context)); } @Test - public void testRemoveOldShards() - { - runRemoveOldShards(HeapAllocator.instance); - runRemoveOldShards(bumpedSlab()); - } - - private void runRemoveOldShards(Allocator allocator) - { - CounterId id1 = CounterId.fromInt(1); - CounterId id3 = CounterId.fromInt(3); - CounterId id6 = CounterId.fromInt(6); - List<CounterId.CounterIdRecord> records = new ArrayList<CounterId.CounterIdRecord>(); - records.add(new CounterId.CounterIdRecord(id1, 2L)); - records.add(new CounterId.CounterIdRecord(id3, 4L)); - records.add(new CounterId.CounterIdRecord(id6, 10L)); - - ContextState ctx = ContextState.allocate(6, 3, allocator); - ctx.writeElement(id1, 1L, 1L, true); - ctx.writeElement(CounterId.fromInt(2), 2L, 2L); - ctx.writeElement(id3, 3L, 3L, true); - ctx.writeElement(CounterId.fromInt(4), 6L, 3L); - ctx.writeElement(CounterId.fromInt(5), 7L, 3L, true); - ctx.writeElement(id6, 5L, 6L); - - ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, Integer.MAX_VALUE); - ByteBuffer merged = cc.merge(ctx.context, merger, allocator); - assert cc.total(ctx.context) == cc.total(merged); - - ByteBuffer cleaned = cc.removeOldShards(merged, (int)(System.currentTimeMillis() / 1000) + 1); - assert cc.total(ctx.context) == cc.total(cleaned); - assert cleaned.remaining() == ctx.context.remaining() - stepLength - 2; - } - - @Test - public void testRemoveOldShardsNotAllExpiring() - { - runRemoveOldShardsNotAllExpiring(HeapAllocator.instance); - runRemoveOldShardsNotAllExpiring(bumpedSlab()); - } - - private void runRemoveOldShardsNotAllExpiring(Allocator allocator) - { - CounterId id1 = CounterId.fromInt(1); - CounterId id3 = CounterId.fromInt(3); - CounterId id6 = CounterId.fromInt(6); - List<CounterId.CounterIdRecord> records = new ArrayList<CounterId.CounterIdRecord>(); - records.add(new CounterId.CounterIdRecord(id1, 2L)); - records.add(new CounterId.CounterIdRecord(id3, 4L)); - records.add(new CounterId.CounterIdRecord(id6, 10L)); - - ContextState ctx = ContextState.allocate(6, 3, allocator); - ctx.writeElement(id1, 0L, 1L, true); - ctx.writeElement(CounterId.fromInt(2), 0L, 2L); - ctx.writeElement(id3, 0L, 3L, true); - ctx.writeElement(CounterId.fromInt(4), 0L, 3L); - ctx.writeElement(CounterId.fromInt(5), 0L, 3L, true); - ctx.writeElement(id6, 0L, 6L); - - int timeFirstMerge = (int)(System.currentTimeMillis() / 1000); - - // First, only merge the first id - ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, 3L); - ByteBuffer merged = cc.merge(ctx.context, merger, allocator); - assert cc.total(ctx.context) == cc.total(merged); - - Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); - - // merge the second one - ByteBuffer merger2 = cc.computeOldShardMerger(merged, records, 7L); - ByteBuffer merged2 = cc.merge(merged, merger2, allocator); - assert cc.total(ctx.context) == cc.total(merged2); - - ByteBuffer cleaned = cc.removeOldShards(merged2, timeFirstMerge + 1); - assert cc.total(ctx.context) == cc.total(cleaned); - assert cleaned.remaining() == ctx.context.remaining(); - - // We should have cleaned id1 but not id3 - ContextState m = new ContextState(cleaned); - m.moveToNext(); - assert m.getCounterId().equals(id3); - - } - - @Test - public void testRemoveNotDeltaOldShards() - { - runRemoveNotDeltaOldShards(HeapAllocator.instance); - runRemoveNotDeltaOldShards(bumpedSlab()); - } - - private void runRemoveNotDeltaOldShards(Allocator allocator) + public void testClearLocal() { - ContextState ctx = ContextState.allocate(4, 1, allocator); - ctx.writeElement(CounterId.fromInt(1), 1L, 1L, true); - ctx.writeElement(CounterId.fromInt(2), -System.currentTimeMillis(), 0L); - ctx.writeElement(CounterId.fromInt(3), -System.currentTimeMillis(), 0L); - ctx.writeElement(CounterId.fromInt(4), -System.currentTimeMillis(), 0L); - - ByteBuffer cleaned = cc.removeOldShards(ctx.context, (int)(System.currentTimeMillis() / 1000) + 1); - assert cc.total(ctx.context) == cc.total(cleaned); - assert cleaned.remaining() == ctx.context.remaining() - 3 * stepLength; + ContextState state; + ByteBuffer marked; + ByteBuffer cleared; + Allocator allocator = HeapAllocator.instance; + + // mark/clear for remote-only contexts is a no-op + state = ContextState.allocate(0, 0, 1, allocator); + state.writeRemote(CounterId.fromInt(1), 1L, 1L); + + assertFalse(cc.shouldClearLocal(state.context)); + marked = cc.markLocalToBeCleared(state.context); + assertEquals(0, marked.getShort(marked.position())); + assertSame(state.context, marked); // should return the original context + + cleared = cc.clearAllLocal(marked); + assertSame(cleared, marked); // shouldn't alter anything either + + // a single local shard + state = ContextState.allocate(0, 1, 0, allocator); + state.writeLocal(CounterId.fromInt(1), 1L, 1L); + + assertFalse(cc.shouldClearLocal(state.context)); + marked = cc.markLocalToBeCleared(state.context); + assertTrue(cc.shouldClearLocal(marked)); + assertEquals(-1, marked.getShort(marked.position())); + assertNotSame(state.context, marked); // shouldn't alter in place, as it used to do + + cleared = cc.clearAllLocal(marked); + assertFalse(cc.shouldClearLocal(cleared)); + assertEquals(0, cleared.getShort(cleared.position())); + + // 2 global + 1 local shard + state = ContextState.allocate(2, 1, 0, allocator); + state.writeLocal(CounterId.fromInt(1), 1L, 1L); + state.writeGlobal(CounterId.fromInt(2), 2L, 2L); + state.writeGlobal(CounterId.fromInt(3), 3L, 3L); + + assertFalse(cc.shouldClearLocal(state.context)); + marked = cc.markLocalToBeCleared(state.context); + assertTrue(cc.shouldClearLocal(marked)); + + assertEquals(-3, marked.getShort(marked.position())); + assertEquals(0, marked.getShort(marked.position() + headerSizeLength)); + assertEquals(Short.MIN_VALUE + 1, marked.getShort(marked.position() + headerSizeLength + headerEltLength)); + assertEquals(Short.MIN_VALUE + 2, marked.getShort(marked.position() + headerSizeLength + 2 * headerEltLength)); + + int headerLength = headerSizeLength + 3 * headerEltLength; + assertTrue(Util.equalsCounterId(CounterId.fromInt(1), marked, headerLength)); + assertEquals(1L, marked.getLong(marked.position() + headerLength + idLength)); + assertEquals(1L, marked.getLong(marked.position() + headerLength + idLength + clockLength)); + + assertTrue(Util.equalsCounterId(CounterId.fromInt(2), marked, headerLength + stepLength)); + assertEquals(2L, marked.getLong(marked.position() + headerLength + stepLength + idLength)); + assertEquals(2L, marked.getLong(marked.position() + headerLength + stepLength + idLength + clockLength)); + + assertTrue(Util.equalsCounterId(CounterId.fromInt(3), marked, headerLength + 2 * stepLength)); + assertEquals(3L, marked.getLong(marked.position() + headerLength + 2 * stepLength + idLength)); + assertEquals(3L, marked.getLong(marked.position() + headerLength + 2 * stepLength + idLength + clockLength)); + + cleared = cc.clearAllLocal(marked); + assertFalse(cc.shouldClearLocal(cleared)); + + assertEquals(2, cleared.getShort(cleared.position())); // 2 global shards + assertEquals(Short.MIN_VALUE + 1, cleared.getShort(marked.position() + headerEltLength)); + assertEquals(Short.MIN_VALUE + 2, cleared.getShort(marked.position() + headerSizeLength + headerEltLength)); + + headerLength = headerSizeLength + 2 * headerEltLength; + assertTrue(Util.equalsCounterId(CounterId.fromInt(1), cleared, headerLength)); + assertEquals(1L, cleared.getLong(cleared.position() + headerLength + idLength)); + assertEquals(1L, cleared.getLong(cleared.position() + headerLength + idLength + clockLength)); + + assertTrue(Util.equalsCounterId(CounterId.fromInt(2), cleared, headerLength + stepLength)); + assertEquals(2L, cleared.getLong(cleared.position() + headerLength + stepLength + idLength)); + assertEquals(2L, cleared.getLong(cleared.position() + headerLength + stepLength + idLength + clockLength)); + + assertTrue(Util.equalsCounterId(CounterId.fromInt(3), cleared, headerLength + 2 * stepLength)); + assertEquals(3L, cleared.getLong(cleared.position() + headerLength + 2 * stepLength + idLength)); + assertEquals(3L, cleared.getLong(cleared.position() + headerLength + 2 * stepLength + idLength + clockLength)); + + // a single global shard - no-op + state = ContextState.allocate(1, 0, 0, allocator); + state.writeGlobal(CounterId.fromInt(1), 1L, 1L); + + assertFalse(cc.shouldClearLocal(state.context)); + marked = cc.markLocalToBeCleared(state.context); + assertEquals(1, marked.getShort(marked.position())); + assertSame(state.context, marked); + + cleared = cc.clearAllLocal(marked); + assertSame(cleared, marked); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 7622728..4cd578d 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -52,6 +52,7 @@ import org.apache.cassandra.thrift.IndexOperator; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CounterId; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.HeapAllocator; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -316,17 +317,13 @@ public class StreamingTransferTest extends SchemaLoader Map<String, ColumnFamily> entries = new HashMap<>(); ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfs.metadata); ColumnFamily cfCleaned = TreeMapBackedSortedColumns.factory.create(cfs.metadata); - CounterContext.ContextState state = CounterContext.ContextState.allocate(4, 1); - state.writeElement(CounterId.fromInt(2), 9L, 3L, true); - state.writeElement(CounterId.fromInt(4), 4L, 2L); - state.writeElement(CounterId.fromInt(6), 3L, 3L); - state.writeElement(CounterId.fromInt(8), 2L, 4L); - cf.addColumn(new CounterColumn(ByteBufferUtil.bytes(col), - state.context, - timestamp)); - cfCleaned.addColumn(new CounterColumn(ByteBufferUtil.bytes(col), - cc.clearAllDelta(state.context), - timestamp)); + CounterContext.ContextState state = CounterContext.ContextState.allocate(0, 1, 3, HeapAllocator.instance); + state.writeLocal(CounterId.fromInt(2), 9L, 3L); + state.writeRemote(CounterId.fromInt(4), 4L, 2L); + state.writeRemote(CounterId.fromInt(6), 3L, 3L); + state.writeRemote(CounterId.fromInt(8), 2L, 4L); + cf.addColumn(new CounterColumn(ByteBufferUtil.bytes(col), state.context, timestamp)); + cfCleaned.addColumn(new CounterColumn(ByteBufferUtil.bytes(col), cc.clearAllLocal(state.context), timestamp)); entries.put(key, cf); cleanedEntries.put(key, cfCleaned);