Updated Branches:
  refs/heads/trunk 7faff8db7 -> a0fb58f6f

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java 
b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
index 0d05340..5c88fd6 100644
--- a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
+++ b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
@@ -23,36 +23,25 @@ package org.apache.cassandra.db.context;
 import static org.junit.Assert.*;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
-import org.apache.cassandra.Util;
 
 import org.apache.cassandra.db.context.IContext.ContextRelationship;
-import static org.apache.cassandra.db.context.CounterContext.ContextState;
+import org.apache.cassandra.Util;
 import org.apache.cassandra.utils.*;
 
-import com.google.common.util.concurrent.Uninterruptibles;
+import static org.apache.cassandra.db.context.CounterContext.ContextState;
 
 public class CounterContextTest
 {
     private static final CounterContext cc = new CounterContext();
 
-    private static final int idLength;
-    private static final int clockLength;
-    private static final int countLength;
-
-    private static final int stepLength;
-
-    static
-    {
-        idLength       = CounterId.LENGTH; // size of int
-        clockLength    = 8; // size of long
-        countLength    = 8; // size of long
-        stepLength     = idLength + clockLength + countLength;
-    }
+    private static final int headerSizeLength = 2;
+    private static final int headerEltLength = 2;
+    private static final int idLength = 16;
+    private static final int clockLength = 8;
+    private static final int countLength = 8;
+    private static final int stepLength = idLength + clockLength + countLength;
 
     /** Allocates 1 byte from a new SlabAllocator and returns it. */
     private Allocator bumpedSlab()
@@ -63,16 +52,25 @@ public class CounterContextTest
     }
 
     @Test
-    public void testCreate()
+    public void testAllocate()
     {
-        runCreate(HeapAllocator.instance);
-        runCreate(bumpedSlab());
+        runAllocate(HeapAllocator.instance);
+        runAllocate(bumpedSlab());
     }
 
-    private void runCreate(Allocator allocator)
+    private void runAllocate(Allocator allocator)
     {
-        ByteBuffer bytes = cc.create(4, allocator);
-        assertEquals(stepLength + 4, bytes.remaining());
+        ContextState allGlobal = ContextState.allocate(3, 0, 0, allocator);
+        assertEquals(headerSizeLength + 3 * headerEltLength + 3 * stepLength, 
allGlobal.context.remaining());
+
+        ContextState allLocal = ContextState.allocate(0, 3, 0, allocator);
+        assertEquals(headerSizeLength + 3 * headerEltLength + 3 * stepLength, 
allLocal.context.remaining());
+
+        ContextState allRemote = ContextState.allocate(0, 0, 3, allocator);
+        assertEquals(headerSizeLength + 3 * stepLength, 
allRemote.context.remaining());
+
+        ContextState mixed = ContextState.allocate(1, 1, 1, allocator);
+        assertEquals(headerSizeLength + 2 * headerEltLength + 3 * stepLength, 
mixed.context.remaining());
     }
 
     @Test
@@ -84,188 +82,176 @@ public class CounterContextTest
 
     private void runDiff(Allocator allocator)
     {
-        ContextState left = ContextState.allocate(3, 0, allocator);
+        ContextState left;
         ContextState right;
 
         // equality: equal nodes, all counts same
-        left.writeElement(CounterId.fromInt(3), 3L, 0L);
-        left.writeElement(CounterId.fromInt(6), 2L, 0L);
-        left.writeElement(CounterId.fromInt(9), 1L, 0L);
-        right = new ContextState(ByteBufferUtil.clone(left.context), 
left.headerLength);
+        left = ContextState.allocate(0, 0, 3, allocator);
+        left.writeRemote(CounterId.fromInt(3), 3L, 0L);
+        left.writeRemote(CounterId.fromInt(6), 2L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 1L, 0L);
+        right = ContextState.wrap(ByteBufferUtil.clone(left.context));
 
-        assert ContextRelationship.EQUAL ==
-            cc.diff(left.context, right.context);
+        assertEquals(ContextRelationship.EQUAL, cc.diff(left.context, 
right.context));
 
         // greater than: left has superset of nodes (counts equal)
-        left = ContextState.allocate(4, 0, allocator);
-        left.writeElement(CounterId.fromInt(3),  3L, 0L);
-        left.writeElement(CounterId.fromInt(6),  2L, 0L);
-        left.writeElement(CounterId.fromInt(9),  1L, 0L);
-        left.writeElement(CounterId.fromInt(12), 0L, 0L);
+        left = ContextState.allocate(0, 0, 4, allocator);
+        left.writeRemote(CounterId.fromInt(3),  3L, 0L);
+        left.writeRemote(CounterId.fromInt(6),  2L, 0L);
+        left.writeRemote(CounterId.fromInt(9),  1L, 0L);
+        left.writeRemote(CounterId.fromInt(12), 0L, 0L);
 
-        right = ContextState.allocate(3, 0, allocator);
-        right.writeElement(CounterId.fromInt(3), 3L, 0L);
-        right.writeElement(CounterId.fromInt(6), 2L, 0L);
-        right.writeElement(CounterId.fromInt(9), 1L, 0L);
+        right = ContextState.allocate(0, 0, 3, allocator);
+        right.writeRemote(CounterId.fromInt(3), 3L, 0L);
+        right.writeRemote(CounterId.fromInt(6), 2L, 0L);
+        right.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        assert ContextRelationship.GREATER_THAN ==
-            cc.diff(left.context, right.context);
+        assertEquals(ContextRelationship.GREATER_THAN, cc.diff(left.context, 
right.context));
 
         // less than: left has subset of nodes (counts equal)
-        left = ContextState.allocate(3, 0, allocator);
-        left.writeElement(CounterId.fromInt(3), 3L, 0L);
-        left.writeElement(CounterId.fromInt(6), 2L, 0L);
-        left.writeElement(CounterId.fromInt(9), 1L, 0L);
+        left = ContextState.allocate(0, 0, 3, allocator);
+        left.writeRemote(CounterId.fromInt(3), 3L, 0L);
+        left.writeRemote(CounterId.fromInt(6), 2L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        right = ContextState.allocate(4, 0, allocator);
-        right.writeElement(CounterId.fromInt(3),  3L, 0L);
-        right.writeElement(CounterId.fromInt(6),  2L, 0L);
-        right.writeElement(CounterId.fromInt(9),  1L, 0L);
-        right.writeElement(CounterId.fromInt(12), 0L, 0L);
+        right = ContextState.allocate(0, 0, 4, allocator);
+        right.writeRemote(CounterId.fromInt(3),  3L, 0L);
+        right.writeRemote(CounterId.fromInt(6),  2L, 0L);
+        right.writeRemote(CounterId.fromInt(9),  1L, 0L);
+        right.writeRemote(CounterId.fromInt(12), 0L, 0L);
 
-        assert ContextRelationship.LESS_THAN ==
-            cc.diff(left.context, right.context);
+        assertEquals(ContextRelationship.LESS_THAN, cc.diff(left.context, 
right.context));
 
         // greater than: equal nodes, but left has higher counts
-        left = ContextState.allocate(3, 0, allocator);
-        left.writeElement(CounterId.fromInt(3), 3L, 0L);
-        left.writeElement(CounterId.fromInt(6), 2L, 0L);
-        left.writeElement(CounterId.fromInt(9), 3L, 0L);
+        left = ContextState.allocate(0, 0, 3, allocator);
+        left.writeRemote(CounterId.fromInt(3), 3L, 0L);
+        left.writeRemote(CounterId.fromInt(6), 2L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 3L, 0L);
 
-        right = ContextState.allocate(3, 0, allocator);
-        right.writeElement(CounterId.fromInt(3), 3L, 0L);
-        right.writeElement(CounterId.fromInt(6), 2L, 0L);
-        right.writeElement(CounterId.fromInt(9), 1L, 0L);
+        right = ContextState.allocate(0, 0, 3, allocator);
+        right.writeRemote(CounterId.fromInt(3), 3L, 0L);
+        right.writeRemote(CounterId.fromInt(6), 2L, 0L);
+        right.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        assert ContextRelationship.GREATER_THAN ==
-            cc.diff(left.context, right.context);
+        assertEquals(ContextRelationship.GREATER_THAN, cc.diff(left.context, 
right.context));
 
         // less than: equal nodes, but right has higher counts
-        left = ContextState.allocate(3, 0, allocator);
-        left.writeElement(CounterId.fromInt(3), 3L, 0L);
-        left.writeElement(CounterId.fromInt(6), 2L, 0L);
-        left.writeElement(CounterId.fromInt(9), 3L, 0L);
+        left = ContextState.allocate(0, 0, 3, allocator);
+        left.writeRemote(CounterId.fromInt(3), 3L, 0L);
+        left.writeRemote(CounterId.fromInt(6), 2L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 3L, 0L);
 
-        right = ContextState.allocate(3, 0, allocator);
-        right.writeElement(CounterId.fromInt(3), 3L, 0L);
-        right.writeElement(CounterId.fromInt(6), 9L, 0L);
-        right.writeElement(CounterId.fromInt(9), 3L, 0L);
+        right = ContextState.allocate(0, 0, 3, allocator);
+        right.writeRemote(CounterId.fromInt(3), 3L, 0L);
+        right.writeRemote(CounterId.fromInt(6), 9L, 0L);
+        right.writeRemote(CounterId.fromInt(9), 3L, 0L);
 
-        assert ContextRelationship.LESS_THAN ==
-            cc.diff(left.context, right.context);
+        assertEquals(ContextRelationship.LESS_THAN, cc.diff(left.context, 
right.context));
 
         // disjoint: right and left have disjoint node sets
-        left = ContextState.allocate(3, 0, allocator);
-        left.writeElement(CounterId.fromInt(3), 1L, 0L);
-        left.writeElement(CounterId.fromInt(4), 1L, 0L);
-        left.writeElement(CounterId.fromInt(9), 1L, 0L);
+        left = ContextState.allocate(0, 0, 3, allocator);
+        left.writeRemote(CounterId.fromInt(3), 1L, 0L);
+        left.writeRemote(CounterId.fromInt(4), 1L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        right = ContextState.allocate(3, 0, allocator);
-        right.writeElement(CounterId.fromInt(3), 1L, 0L);
-        right.writeElement(CounterId.fromInt(6), 1L, 0L);
-        right.writeElement(CounterId.fromInt(9), 1L, 0L);
+        right = ContextState.allocate(0, 0, 3, allocator);
+        right.writeRemote(CounterId.fromInt(3), 1L, 0L);
+        right.writeRemote(CounterId.fromInt(6), 1L, 0L);
+        right.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        assert ContextRelationship.DISJOINT ==
-            cc.diff(left.context, right.context);
+        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, 
right.context));
 
-        left = ContextState.allocate(3, 0, allocator);
-        left.writeElement(CounterId.fromInt(3), 1L, 0L);
-        left.writeElement(CounterId.fromInt(4), 1L, 0L);
-        left.writeElement(CounterId.fromInt(9), 1L, 0L);
+        left = ContextState.allocate(0, 0, 3, allocator);
+        left.writeRemote(CounterId.fromInt(3), 1L, 0L);
+        left.writeRemote(CounterId.fromInt(4), 1L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        right = ContextState.allocate(3, 0, allocator);
-        right.writeElement(CounterId.fromInt(2),  1L, 0L);
-        right.writeElement(CounterId.fromInt(6),  1L, 0L);
-        right.writeElement(CounterId.fromInt(12), 1L, 0L);
+        right = ContextState.allocate(0, 0, 3, allocator);
+        right.writeRemote(CounterId.fromInt(2),  1L, 0L);
+        right.writeRemote(CounterId.fromInt(6),  1L, 0L);
+        right.writeRemote(CounterId.fromInt(12), 1L, 0L);
 
-        assert ContextRelationship.DISJOINT ==
-            cc.diff(left.context, right.context);
+        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, 
right.context));
 
         // disjoint: equal nodes, but right and left have higher counts in 
differing nodes
-        left = ContextState.allocate(3, 0, allocator);
-        left.writeElement(CounterId.fromInt(3), 1L, 0L);
-        left.writeElement(CounterId.fromInt(6), 3L, 0L);
-        left.writeElement(CounterId.fromInt(9), 1L, 0L);
+        left = ContextState.allocate(0, 0, 3, allocator);
+        left.writeRemote(CounterId.fromInt(3), 1L, 0L);
+        left.writeRemote(CounterId.fromInt(6), 3L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        right = ContextState.allocate(3, 0, allocator);
-        right.writeElement(CounterId.fromInt(3), 1L, 0L);
-        right.writeElement(CounterId.fromInt(6), 1L, 0L);
-        right.writeElement(CounterId.fromInt(9), 5L, 0L);
+        right = ContextState.allocate(0, 0, 3, allocator);
+        right.writeRemote(CounterId.fromInt(3), 1L, 0L);
+        right.writeRemote(CounterId.fromInt(6), 1L, 0L);
+        right.writeRemote(CounterId.fromInt(9), 5L, 0L);
 
-        assert ContextRelationship.DISJOINT ==
-            cc.diff(left.context, right.context);
+        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, 
right.context));
 
-        left = ContextState.allocate(3, 0, allocator);
-        left.writeElement(CounterId.fromInt(3), 2L, 0L);
-        left.writeElement(CounterId.fromInt(6), 3L, 0L);
-        left.writeElement(CounterId.fromInt(9), 1L, 0L);
+        left = ContextState.allocate(0, 0, 3, allocator);
+        left.writeRemote(CounterId.fromInt(3), 2L, 0L);
+        left.writeRemote(CounterId.fromInt(6), 3L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        right = ContextState.allocate(3, 0, allocator);
-        right.writeElement(CounterId.fromInt(3), 1L, 0L);
-        right.writeElement(CounterId.fromInt(6), 9L, 0L);
-        right.writeElement(CounterId.fromInt(9), 5L, 0L);
+        right = ContextState.allocate(0, 0, 3, allocator);
+        right.writeRemote(CounterId.fromInt(3), 1L, 0L);
+        right.writeRemote(CounterId.fromInt(6), 9L, 0L);
+        right.writeRemote(CounterId.fromInt(9), 5L, 0L);
 
-        assert ContextRelationship.DISJOINT ==
-            cc.diff(left.context, right.context);
+        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, 
right.context));
 
         // disjoint: left has more nodes, but lower counts
-        left = ContextState.allocate(4, 0, allocator);
-        left.writeElement(CounterId.fromInt(3),  2L, 0L);
-        left.writeElement(CounterId.fromInt(6),  3L, 0L);
-        left.writeElement(CounterId.fromInt(9),  1L, 0L);
-        left.writeElement(CounterId.fromInt(12), 1L, 0L);
+        left = ContextState.allocate(0, 0, 4, allocator);
+        left.writeRemote(CounterId.fromInt(3),  2L, 0L);
+        left.writeRemote(CounterId.fromInt(6),  3L, 0L);
+        left.writeRemote(CounterId.fromInt(9),  1L, 0L);
+        left.writeRemote(CounterId.fromInt(12), 1L, 0L);
 
-        right = ContextState.allocate(3, 0, allocator);
-        right.writeElement(CounterId.fromInt(3), 4L, 0L);
-        right.writeElement(CounterId.fromInt(6), 9L, 0L);
-        right.writeElement(CounterId.fromInt(9), 5L, 0L);
+        right = ContextState.allocate(0, 0, 3, allocator);
+        right.writeRemote(CounterId.fromInt(3), 4L, 0L);
+        right.writeRemote(CounterId.fromInt(6), 9L, 0L);
+        right.writeRemote(CounterId.fromInt(9), 5L, 0L);
 
-        assert ContextRelationship.DISJOINT ==
-            cc.diff(left.context, right.context);
+        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, 
right.context));
 
         // disjoint: left has less nodes, but higher counts
-        left = ContextState.allocate(3, 0, allocator);
-        left.writeElement(CounterId.fromInt(3), 5L, 0L);
-        left.writeElement(CounterId.fromInt(6), 3L, 0L);
-        left.writeElement(CounterId.fromInt(9), 2L, 0L);
+        left = ContextState.allocate(0, 0, 3, allocator);
+        left.writeRemote(CounterId.fromInt(3), 5L, 0L);
+        left.writeRemote(CounterId.fromInt(6), 3L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 2L, 0L);
 
-        right = ContextState.allocate(4, 0, allocator);
-        right.writeElement(CounterId.fromInt(3),  4L, 0L);
-        right.writeElement(CounterId.fromInt(6),  3L, 0L);
-        right.writeElement(CounterId.fromInt(9),  2L, 0L);
-        right.writeElement(CounterId.fromInt(12), 1L, 0L);
+        right = ContextState.allocate(0, 0, 4, allocator);
+        right.writeRemote(CounterId.fromInt(3),  4L, 0L);
+        right.writeRemote(CounterId.fromInt(6),  3L, 0L);
+        right.writeRemote(CounterId.fromInt(9),  2L, 0L);
+        right.writeRemote(CounterId.fromInt(12), 1L, 0L);
 
-        assert ContextRelationship.DISJOINT ==
-            cc.diff(left.context, right.context);
+        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, 
right.context));
 
         // disjoint: mixed nodes and counts
-        left = ContextState.allocate(3, 0, allocator);
-        left.writeElement(CounterId.fromInt(3), 5L, 0L);
-        left.writeElement(CounterId.fromInt(6), 2L, 0L);
-        left.writeElement(CounterId.fromInt(9), 2L, 0L);
-
-        right = ContextState.allocate(4, 0, allocator);
-        right.writeElement(CounterId.fromInt(3),  4L, 0L);
-        right.writeElement(CounterId.fromInt(6),  3L, 0L);
-        right.writeElement(CounterId.fromInt(9),  2L, 0L);
-        right.writeElement(CounterId.fromInt(12), 1L, 0L);
-
-        assert ContextRelationship.DISJOINT ==
-            cc.diff(left.context, right.context);
-
-        left = ContextState.allocate(4, 0, allocator);
-        left.writeElement(CounterId.fromInt(3), 5L, 0L);
-        left.writeElement(CounterId.fromInt(6), 2L, 0L);
-        left.writeElement(CounterId.fromInt(7), 2L, 0L);
-        left.writeElement(CounterId.fromInt(9), 2L, 0L);
-
-        right = ContextState.allocate(3, 0, allocator);
-        right.writeElement(CounterId.fromInt(3), 4L, 0L);
-        right.writeElement(CounterId.fromInt(6), 3L, 0L);
-        right.writeElement(CounterId.fromInt(9), 2L, 0L);
-
-        assert ContextRelationship.DISJOINT ==
-            cc.diff(left.context, right.context);
+        left = ContextState.allocate(0, 0, 3, allocator);
+        left.writeRemote(CounterId.fromInt(3), 5L, 0L);
+        left.writeRemote(CounterId.fromInt(6), 2L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 2L, 0L);
+
+        right = ContextState.allocate(0, 0, 4, allocator);
+        right.writeRemote(CounterId.fromInt(3),  4L, 0L);
+        right.writeRemote(CounterId.fromInt(6),  3L, 0L);
+        right.writeRemote(CounterId.fromInt(9),  2L, 0L);
+        right.writeRemote(CounterId.fromInt(12), 1L, 0L);
+
+        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, 
right.context));
+
+        left = ContextState.allocate(0, 0, 4, allocator);
+        left.writeRemote(CounterId.fromInt(3), 5L, 0L);
+        left.writeRemote(CounterId.fromInt(6), 2L, 0L);
+        left.writeRemote(CounterId.fromInt(7), 2L, 0L);
+        left.writeRemote(CounterId.fromInt(9), 2L, 0L);
+
+        right = ContextState.allocate(0, 0, 3, allocator);
+        right.writeRemote(CounterId.fromInt(3), 4L, 0L);
+        right.writeRemote(CounterId.fromInt(6), 3L, 0L);
+        right.writeRemote(CounterId.fromInt(9), 2L, 0L);
+
+        assertEquals(ContextRelationship.DISJOINT, cc.diff(left.context, 
right.context));
     }
 
     @Test
@@ -278,42 +264,121 @@ public class CounterContextTest
     private void runMerge(Allocator allocator)
     {
         // note: local counts aggregated; remote counts are reconciled (i.e. 
take max)
-        ContextState left = ContextState.allocate(4, 1, allocator);
-        left.writeElement(CounterId.fromInt(1), 1L, 1L);
-        left.writeElement(CounterId.fromInt(2), 2L, 2L);
-        left.writeElement(CounterId.fromInt(4), 6L, 3L);
-        left.writeElement(CounterId.getLocalId(), 7L, 3L, true);
+        ContextState left = ContextState.allocate(0, 1, 3, allocator);
+        left.writeRemote(CounterId.fromInt(1), 1L, 1L);
+        left.writeRemote(CounterId.fromInt(2), 2L, 2L);
+        left.writeRemote(CounterId.fromInt(4), 6L, 3L);
+        left.writeLocal(CounterId.getLocalId(), 7L, 3L);
 
-        ContextState right = ContextState.allocate(3, 1, allocator);
-        right.writeElement(CounterId.fromInt(4), 4L, 4L);
-        right.writeElement(CounterId.fromInt(5), 5L, 5L);
-        right.writeElement(CounterId.getLocalId(), 2L, 9L, true);
+        ContextState right = ContextState.allocate(0, 1, 2, allocator);
+        right.writeRemote(CounterId.fromInt(4), 4L, 4L);
+        right.writeRemote(CounterId.fromInt(5), 5L, 5L);
+        right.writeLocal(CounterId.getLocalId(), 2L, 9L);
 
         ByteBuffer merged = cc.merge(left.context, right.context, allocator);
         int hd = 4;
 
         assertEquals(hd + 5 * stepLength, merged.remaining());
         // local node id's counts are aggregated
-        assert Util.equalsCounterId(CounterId.getLocalId(), merged, hd + 4 * 
stepLength);
-        assertEquals(  9L, merged.getLong(merged.position() + hd + 
4*stepLength + idLength));
+        assertTrue(Util.equalsCounterId(CounterId.getLocalId(), merged, hd + 4 
* stepLength));
+        assertEquals(9L, merged.getLong(merged.position() + hd + 4 * 
stepLength + idLength));
         assertEquals(12L,  merged.getLong(merged.position() + hd + 
4*stepLength + idLength + clockLength));
 
         // remote node id counts are reconciled (i.e. take max)
-        assert Util.equalsCounterId(CounterId.fromInt(4), merged, hd + 2 * 
stepLength);
-        assertEquals( 6L,  merged.getLong(merged.position() + hd + 
2*stepLength + idLength));
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(4), merged, hd + 2 * 
stepLength));
+        assertEquals(6L, merged.getLong(merged.position() + hd + 2 * 
stepLength + idLength));
         assertEquals( 3L,  merged.getLong(merged.position() + hd + 
2*stepLength + idLength + clockLength));
 
-        assert Util.equalsCounterId(CounterId.fromInt(5), merged, hd + 3 * 
stepLength);
-        assertEquals( 5L,  merged.getLong(merged.position() + hd + 
3*stepLength + idLength));
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(5), merged, hd + 3 * 
stepLength));
+        assertEquals(5L, merged.getLong(merged.position() + hd + 3 * 
stepLength + idLength));
         assertEquals( 5L,  merged.getLong(merged.position() + hd + 
3*stepLength + idLength + clockLength));
 
-        assert Util.equalsCounterId(CounterId.fromInt(2), merged, hd + 1 * 
stepLength);
-        assertEquals( 2L,  merged.getLong(merged.position() + hd + 
1*stepLength + idLength));
-        assertEquals( 2L,  merged.getLong(merged.position() + hd + 
1*stepLength + idLength + clockLength));
-
-        assert Util.equalsCounterId(CounterId.fromInt(1), merged, hd + 0 * 
stepLength);
-        assertEquals( 1L,  merged.getLong(merged.position() + hd + 
0*stepLength + idLength));
-        assertEquals( 1L,  merged.getLong(merged.position() + hd + 
0*stepLength + idLength + clockLength));
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(2), merged, hd + 
stepLength));
+        assertEquals(2L, merged.getLong(merged.position() + hd + stepLength + 
idLength));
+        assertEquals( 2L,  merged.getLong(merged.position() + hd + stepLength 
+ idLength + clockLength));
+
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(1), merged, hd));
+        assertEquals( 1L,  merged.getLong(merged.position() + hd + idLength));
+        assertEquals( 1L,  merged.getLong(merged.position() + hd + idLength + 
clockLength));
+
+        //
+        // Test merging two exclusively global contexts
+        //
+        left = ContextState.allocate(3, 0, 0, allocator);
+        left.writeGlobal(CounterId.fromInt(1), 1L, 1L);
+        left.writeGlobal(CounterId.fromInt(2), 2L, 2L);
+        left.writeGlobal(CounterId.fromInt(3), 3L, 3L);
+
+        right = ContextState.allocate(3, 0, 0, allocator);
+        right.writeGlobal(CounterId.fromInt(3), 6L, 6L);
+        right.writeGlobal(CounterId.fromInt(4), 4L, 4L);
+        right.writeGlobal(CounterId.fromInt(5), 5L, 5L);
+
+        merged = cc.merge(left.context, right.context, allocator);
+        assertEquals(headerSizeLength + 5 * headerEltLength + 5 * stepLength, 
merged.remaining());
+        assertEquals(18L, cc.total(merged));
+        assertEquals(5, merged.getShort(merged.position()));
+
+        int headerLength = headerSizeLength + 5 * headerEltLength;
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(1), merged, 
headerLength));
+        assertEquals(1L, merged.getLong(merged.position() + headerLength + 
idLength));
+        assertEquals(1L, merged.getLong(merged.position() + headerLength + 
idLength + clockLength));
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(2), merged, 
headerLength + stepLength));
+        assertEquals(2L, merged.getLong(merged.position() + headerLength + 
stepLength + idLength));
+        assertEquals(2L, merged.getLong(merged.position() + headerLength + 
stepLength + idLength + clockLength));
+        // pick the global shard with the largest clock
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(3), merged, 
headerLength + 2 * stepLength));
+        assertEquals(6L, merged.getLong(merged.position() + headerLength + 2 * 
stepLength + idLength));
+        assertEquals(6L, merged.getLong(merged.position() + headerLength + 2 * 
stepLength + idLength + clockLength));
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(4), merged, 
headerLength + 3 * stepLength));
+        assertEquals(4L, merged.getLong(merged.position() + headerLength + 3 * 
stepLength + idLength));
+        assertEquals(4L, merged.getLong(merged.position() + headerLength + 3 * 
stepLength + idLength + clockLength));
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(5), merged, 
headerLength + 4 * stepLength));
+        assertEquals(5L, merged.getLong(merged.position() + headerLength + 4 * 
stepLength + idLength));
+        assertEquals(5L, merged.getLong(merged.position() + headerLength + 4 * 
stepLength + idLength + clockLength));
+
+        //
+        // Test merging two global contexts w/ 'invalid shards'
+        //
+        left = ContextState.allocate(1, 0, 0, allocator);
+        left.writeGlobal(CounterId.fromInt(1), 10L, 20L);
+
+        right = ContextState.allocate(1, 0, 0, allocator);
+        right.writeGlobal(CounterId.fromInt(1), 10L, 30L);
+
+        merged = cc.merge(left.context, right.context, allocator);
+        headerLength = headerSizeLength + headerEltLength;
+        assertEquals(headerLength + stepLength, merged.remaining());
+        assertEquals(30L, cc.total(merged));
+        assertEquals(1, merged.getShort(merged.position()));
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(1), merged, 
headerLength));
+        assertEquals(10L, merged.getLong(merged.position() + headerLength + 
idLength));
+        // with equal clock, we should pick the largest value
+        assertEquals(30L, merged.getLong(merged.position() + headerLength + 
idLength + clockLength));
+
+        //
+        // Test merging global w/ mixed contexts
+        //
+        left = ContextState.allocate(2, 0, 0, allocator);
+        left.writeGlobal(CounterId.fromInt(1), 1L, 1L);
+        left.writeGlobal(CounterId.fromInt(2), 1L, 1L);
+
+        right = ContextState.allocate(0, 1, 1, allocator);
+        right.writeLocal(CounterId.fromInt(1), 100L, 100L);
+        right.writeRemote(CounterId.fromInt(2), 100L, 100L);
+
+        // global shards should dominate local/remote, even with lower clock 
and value
+        merged = cc.merge(left.context, right.context, allocator);
+        headerLength = headerSizeLength + 2 * headerEltLength;
+        assertEquals(headerLength + 2 * stepLength, merged.remaining());
+        assertEquals(2L, cc.total(merged));
+        assertEquals(2, merged.getShort(merged.position()));
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(1), merged, 
headerLength));
+        assertEquals(1L, merged.getLong(merged.position() + headerLength + 
idLength));
+        assertEquals(1L, merged.getLong(merged.position() + headerLength + 
idLength + clockLength));
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(2), merged, 
headerLength + stepLength));
+        assertEquals(1L, merged.getLong(merged.position() + headerLength + 
stepLength + idLength));
+        assertEquals(1L, merged.getLong(merged.position() + headerLength + 
stepLength + idLength + clockLength));
     }
 
     @Test
@@ -325,173 +390,113 @@ public class CounterContextTest
 
     private void runTotal(Allocator allocator)
     {
-        ContextState left = ContextState.allocate(4, 1, allocator);
-        left.writeElement(CounterId.fromInt(1), 1L, 1L);
-        left.writeElement(CounterId.fromInt(2), 2L, 2L);
-        left.writeElement(CounterId.fromInt(4), 3L, 3L);
-        left.writeElement(CounterId.getLocalId(), 3L, 3L, true);
-
-        ContextState right = ContextState.allocate(3, 1, allocator);
-        right.writeElement(CounterId.fromInt(4), 4L, 4L);
-        right.writeElement(CounterId.fromInt(5), 5L, 5L);
-        right.writeElement(CounterId.getLocalId(), 9L, 9L, true);
-
-        ByteBuffer merged = cc.merge(left.context, right.context, allocator);
-
-        // 127.0.0.1: 12 (3+9)
-        // 0.0.0.1:    1
-        // 0.0.0.2:    2
-        // 0.0.0.4:    4
-        // 0.0.0.5:    5
-
-        assertEquals(24L, cc.total(merged));
-    }
-
-    @Test
-    public void testMergeOldShards()
-    {
-        runMergeOldShards(HeapAllocator.instance);
-        runMergeOldShards(bumpedSlab());
-    }
-
-    private void runMergeOldShards(Allocator allocator)
-    {
-        long now = System.currentTimeMillis();
-        CounterId id1 = CounterId.fromInt(1);
-        CounterId id3 = CounterId.fromInt(3);
-        List<CounterId.CounterIdRecord> records = new 
ArrayList<CounterId.CounterIdRecord>();
-        records.add(new CounterId.CounterIdRecord(id1, 2L));
-        records.add(new CounterId.CounterIdRecord(id3, 4L));
-
-        ContextState ctx = ContextState.allocate(5, 3, allocator);
-        ctx.writeElement(id1, 1L, 1L, true);
-        ctx.writeElement(CounterId.fromInt(2), 2L, 2L);
-        ctx.writeElement(id3, 3L, 3L, true);
-        ctx.writeElement(CounterId.fromInt(4), 6L, 3L);
-        ctx.writeElement(CounterId.fromInt(5), 7L, 3L, true);
-
-        ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, 
Integer.MAX_VALUE);
-
-        ContextState m = new ContextState(merger);
-
-        assert m.getCounterId().equals(id1);
-        assert m.getClock() <= -now;
-        assert m.getCount() == -1L;
-        assert m.isDelta();
-        m.moveToNext();
-        assert m.getCounterId().equals(id3);
-        assert m.getClock() <= -now;
-        assert m.getCount() == -3L;
-        assert m.isDelta();
-        m.moveToNext();
-        assert m.getCounterId().equals(CounterId.getLocalId());
-        assert m.getClock() == 1L;
-        assert m.getCount() == 4L;
-        assert m.isDelta();
-        assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger, 
allocator));
+        ContextState mixed = ContextState.allocate(0, 1, 4, allocator);
+        mixed.writeRemote(CounterId.fromInt(1), 1L, 1L);
+        mixed.writeRemote(CounterId.fromInt(2), 2L, 2L);
+        mixed.writeRemote(CounterId.fromInt(4), 4L, 4L);
+        mixed.writeRemote(CounterId.fromInt(5), 5L, 5L);
+        mixed.writeLocal(CounterId.getLocalId(), 12L, 12L);
+        assertEquals(24L, cc.total(mixed.context));
+
+        ContextState global = ContextState.allocate(3, 0, 0, allocator);
+        global.writeGlobal(CounterId.fromInt(1), 1L, 1L);
+        global.writeGlobal(CounterId.fromInt(2), 2L, 2L);
+        global.writeGlobal(CounterId.fromInt(3), 3L, 3L);
+        assertEquals(6L, cc.total(global.context));
     }
 
     @Test
-    public void testRemoveOldShards()
-    {
-        runRemoveOldShards(HeapAllocator.instance);
-        runRemoveOldShards(bumpedSlab());
-    }
-
-    private void runRemoveOldShards(Allocator allocator)
-    {
-        CounterId id1 = CounterId.fromInt(1);
-        CounterId id3 = CounterId.fromInt(3);
-        CounterId id6 = CounterId.fromInt(6);
-        List<CounterId.CounterIdRecord> records = new 
ArrayList<CounterId.CounterIdRecord>();
-        records.add(new CounterId.CounterIdRecord(id1, 2L));
-        records.add(new CounterId.CounterIdRecord(id3, 4L));
-        records.add(new CounterId.CounterIdRecord(id6, 10L));
-
-        ContextState ctx = ContextState.allocate(6, 3, allocator);
-        ctx.writeElement(id1, 1L, 1L, true);
-        ctx.writeElement(CounterId.fromInt(2), 2L, 2L);
-        ctx.writeElement(id3, 3L, 3L, true);
-        ctx.writeElement(CounterId.fromInt(4), 6L, 3L);
-        ctx.writeElement(CounterId.fromInt(5), 7L, 3L, true);
-        ctx.writeElement(id6, 5L, 6L);
-
-        ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, 
Integer.MAX_VALUE);
-        ByteBuffer merged = cc.merge(ctx.context, merger, allocator);
-        assert cc.total(ctx.context) == cc.total(merged);
-
-        ByteBuffer cleaned = cc.removeOldShards(merged, 
(int)(System.currentTimeMillis() / 1000) + 1);
-        assert cc.total(ctx.context) == cc.total(cleaned);
-        assert cleaned.remaining() == ctx.context.remaining() - stepLength - 2;
-    }
-
-    @Test
-    public void testRemoveOldShardsNotAllExpiring()
-    {
-        runRemoveOldShardsNotAllExpiring(HeapAllocator.instance);
-        runRemoveOldShardsNotAllExpiring(bumpedSlab());
-    }
-
-    private void runRemoveOldShardsNotAllExpiring(Allocator allocator)
-    {
-        CounterId id1 = CounterId.fromInt(1);
-        CounterId id3 = CounterId.fromInt(3);
-        CounterId id6 = CounterId.fromInt(6);
-        List<CounterId.CounterIdRecord> records = new 
ArrayList<CounterId.CounterIdRecord>();
-        records.add(new CounterId.CounterIdRecord(id1, 2L));
-        records.add(new CounterId.CounterIdRecord(id3, 4L));
-        records.add(new CounterId.CounterIdRecord(id6, 10L));
-
-        ContextState ctx = ContextState.allocate(6, 3, allocator);
-        ctx.writeElement(id1, 0L, 1L, true);
-        ctx.writeElement(CounterId.fromInt(2), 0L, 2L);
-        ctx.writeElement(id3, 0L, 3L, true);
-        ctx.writeElement(CounterId.fromInt(4), 0L, 3L);
-        ctx.writeElement(CounterId.fromInt(5), 0L, 3L, true);
-        ctx.writeElement(id6, 0L, 6L);
-
-        int timeFirstMerge = (int)(System.currentTimeMillis() / 1000);
-
-        // First, only merge the first id
-        ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, 3L);
-        ByteBuffer merged = cc.merge(ctx.context, merger, allocator);
-        assert cc.total(ctx.context) == cc.total(merged);
-
-        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
-
-        // merge the second one
-        ByteBuffer merger2 = cc.computeOldShardMerger(merged, records, 7L);
-        ByteBuffer merged2 = cc.merge(merged, merger2, allocator);
-        assert cc.total(ctx.context) == cc.total(merged2);
-
-        ByteBuffer cleaned = cc.removeOldShards(merged2, timeFirstMerge + 1);
-        assert cc.total(ctx.context) == cc.total(cleaned);
-        assert cleaned.remaining() == ctx.context.remaining();
-
-        // We should have cleaned id1 but not id3
-        ContextState m = new ContextState(cleaned);
-        m.moveToNext();
-        assert m.getCounterId().equals(id3);
-
-    }
-
-    @Test
-    public void testRemoveNotDeltaOldShards()
-    {
-        runRemoveNotDeltaOldShards(HeapAllocator.instance);
-        runRemoveNotDeltaOldShards(bumpedSlab());
-    }
-
-    private void runRemoveNotDeltaOldShards(Allocator allocator)
+    public void testClearLocal()
     {
-        ContextState ctx = ContextState.allocate(4, 1, allocator);
-        ctx.writeElement(CounterId.fromInt(1), 1L, 1L, true);
-        ctx.writeElement(CounterId.fromInt(2), -System.currentTimeMillis(), 
0L);
-        ctx.writeElement(CounterId.fromInt(3), -System.currentTimeMillis(), 
0L);
-        ctx.writeElement(CounterId.fromInt(4), -System.currentTimeMillis(), 
0L);
-
-        ByteBuffer cleaned = cc.removeOldShards(ctx.context, 
(int)(System.currentTimeMillis() / 1000) + 1);
-        assert cc.total(ctx.context) == cc.total(cleaned);
-        assert cleaned.remaining() == ctx.context.remaining() - 3 * stepLength;
+        ContextState state;
+        ByteBuffer marked;
+        ByteBuffer cleared;
+        Allocator allocator = HeapAllocator.instance;
+
+        // mark/clear for remote-only contexts is a no-op
+        state = ContextState.allocate(0, 0, 1, allocator);
+        state.writeRemote(CounterId.fromInt(1), 1L, 1L);
+
+        assertFalse(cc.shouldClearLocal(state.context));
+        marked = cc.markLocalToBeCleared(state.context);
+        assertEquals(0, marked.getShort(marked.position()));
+        assertSame(state.context, marked); // should return the original 
context
+
+        cleared = cc.clearAllLocal(marked);
+        assertSame(cleared, marked); // shouldn't alter anything either
+
+        // a single local shard
+        state = ContextState.allocate(0, 1, 0, allocator);
+        state.writeLocal(CounterId.fromInt(1), 1L, 1L);
+
+        assertFalse(cc.shouldClearLocal(state.context));
+        marked = cc.markLocalToBeCleared(state.context);
+        assertTrue(cc.shouldClearLocal(marked));
+        assertEquals(-1, marked.getShort(marked.position()));
+        assertNotSame(state.context, marked); // shouldn't alter in place, as 
it used to do
+
+        cleared = cc.clearAllLocal(marked);
+        assertFalse(cc.shouldClearLocal(cleared));
+        assertEquals(0, cleared.getShort(cleared.position()));
+
+        // 2 global + 1 local shard
+        state = ContextState.allocate(2, 1, 0, allocator);
+        state.writeLocal(CounterId.fromInt(1), 1L, 1L);
+        state.writeGlobal(CounterId.fromInt(2), 2L, 2L);
+        state.writeGlobal(CounterId.fromInt(3), 3L, 3L);
+
+        assertFalse(cc.shouldClearLocal(state.context));
+        marked = cc.markLocalToBeCleared(state.context);
+        assertTrue(cc.shouldClearLocal(marked));
+
+        assertEquals(-3, marked.getShort(marked.position()));
+        assertEquals(0, marked.getShort(marked.position() + headerSizeLength));
+        assertEquals(Short.MIN_VALUE + 1, marked.getShort(marked.position() + 
headerSizeLength + headerEltLength));
+        assertEquals(Short.MIN_VALUE + 2, marked.getShort(marked.position() + 
headerSizeLength + 2 * headerEltLength));
+
+        int headerLength = headerSizeLength + 3 * headerEltLength;
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(1), marked, 
headerLength));
+        assertEquals(1L, marked.getLong(marked.position() + headerLength + 
idLength));
+        assertEquals(1L, marked.getLong(marked.position() + headerLength + 
idLength + clockLength));
+
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(2), marked, 
headerLength + stepLength));
+        assertEquals(2L, marked.getLong(marked.position() + headerLength + 
stepLength + idLength));
+        assertEquals(2L, marked.getLong(marked.position() + headerLength + 
stepLength + idLength + clockLength));
+
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(3), marked, 
headerLength + 2 * stepLength));
+        assertEquals(3L, marked.getLong(marked.position() + headerLength + 2 * 
stepLength + idLength));
+        assertEquals(3L, marked.getLong(marked.position() + headerLength + 2 * 
stepLength + idLength + clockLength));
+
+        cleared = cc.clearAllLocal(marked);
+        assertFalse(cc.shouldClearLocal(cleared));
+
+        assertEquals(2, cleared.getShort(cleared.position())); // 2 global 
shards
+        assertEquals(Short.MIN_VALUE + 1, cleared.getShort(marked.position() + 
headerEltLength));
+        assertEquals(Short.MIN_VALUE + 2, cleared.getShort(marked.position() + 
headerSizeLength + headerEltLength));
+
+        headerLength = headerSizeLength + 2 * headerEltLength;
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(1), cleared, 
headerLength));
+        assertEquals(1L, cleared.getLong(cleared.position() + headerLength + 
idLength));
+        assertEquals(1L, cleared.getLong(cleared.position() + headerLength + 
idLength + clockLength));
+
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(2), cleared, 
headerLength + stepLength));
+        assertEquals(2L, cleared.getLong(cleared.position() + headerLength + 
stepLength + idLength));
+        assertEquals(2L, cleared.getLong(cleared.position() + headerLength + 
stepLength + idLength + clockLength));
+
+        assertTrue(Util.equalsCounterId(CounterId.fromInt(3), cleared, 
headerLength + 2 * stepLength));
+        assertEquals(3L, cleared.getLong(cleared.position() + headerLength + 2 
* stepLength + idLength));
+        assertEquals(3L, cleared.getLong(cleared.position() + headerLength + 2 
* stepLength + idLength + clockLength));
+
+        // a single global shard - no-op
+        state = ContextState.allocate(1, 0, 0, allocator);
+        state.writeGlobal(CounterId.fromInt(1), 1L, 1L);
+
+        assertFalse(cc.shouldClearLocal(state.context));
+        marked = cc.markLocalToBeCleared(state.context);
+        assertEquals(1, marked.getShort(marked.position()));
+        assertSame(state.context, marked);
+
+        cleared = cc.clearAllLocal(marked);
+        assertSame(cleared, marked);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83cd80b2/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 7622728..4cd578d 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HeapAllocator;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -316,17 +317,13 @@ public class StreamingTransferTest extends SchemaLoader
                 Map<String, ColumnFamily> entries = new HashMap<>();
                 ColumnFamily cf = 
TreeMapBackedSortedColumns.factory.create(cfs.metadata);
                 ColumnFamily cfCleaned = 
TreeMapBackedSortedColumns.factory.create(cfs.metadata);
-                CounterContext.ContextState state = 
CounterContext.ContextState.allocate(4, 1);
-                state.writeElement(CounterId.fromInt(2), 9L, 3L, true);
-                state.writeElement(CounterId.fromInt(4), 4L, 2L);
-                state.writeElement(CounterId.fromInt(6), 3L, 3L);
-                state.writeElement(CounterId.fromInt(8), 2L, 4L);
-                cf.addColumn(new CounterColumn(ByteBufferUtil.bytes(col),
-                        state.context,
-                        timestamp));
-                cfCleaned.addColumn(new 
CounterColumn(ByteBufferUtil.bytes(col),
-                        cc.clearAllDelta(state.context),
-                        timestamp));
+                CounterContext.ContextState state = 
CounterContext.ContextState.allocate(0, 1, 3, HeapAllocator.instance);
+                state.writeLocal(CounterId.fromInt(2), 9L, 3L);
+                state.writeRemote(CounterId.fromInt(4), 4L, 2L);
+                state.writeRemote(CounterId.fromInt(6), 3L, 3L);
+                state.writeRemote(CounterId.fromInt(8), 2L, 4L);
+                cf.addColumn(new CounterColumn(ByteBufferUtil.bytes(col), 
state.context, timestamp));
+                cfCleaned.addColumn(new 
CounterColumn(ByteBufferUtil.bytes(col), cc.clearAllLocal(state.context), 
timestamp));
 
                 entries.put(key, cf);
                 cleanedEntries.put(key, cfCleaned);

Reply via email to