Repository: cassandra
Updated Branches:
  refs/heads/cassandra-13983-rebased2 [created] ae837806b


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/utils/concurrent/WeightedQueue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/WeightedQueue.java 
b/src/java/org/apache/cassandra/utils/concurrent/WeightedQueue.java
new file mode 100644
index 0000000..3a6505e
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/WeightedQueue.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Weighted queue is a wrapper around any blocking queue that turns it into a 
blocking weighted queue. The queue
+ * will weigh each element being added and removed. Adding to the queue is 
blocked if adding would violate
+ * the weight bound.
+ *
+ * If an element weighs in at larger than the capacity of the queue then 
exactly one such element will be allowed
+ * into the queue at a time.
+ *
+ * If the weight of an object changes after it is added you are going to have 
a bad time. Checking weight should be
+ * cheap so memoize expensive to compute weights. If weight throws that can 
also result in leaked permits so it's
+ * always a good idea to memoize weight so it doesn't throw.
+ *
+ * In the interests of not writing unit tests for methods no one uses there is 
a lot of UnsupportedOperationException.
+ * If you need them then add them and add proper unit tests to 
WeightedQueueTest. "Good" tests. 100% coverage including
+ * exception paths and resource leaks.
+ **/
+public class WeightedQueue<T> implements BlockingQueue<T>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(WeightedQueue.class);
+    public static final Weigher NATURAL_WEIGHER = (Weigher<Object>) weighable 
->
+    {
+        if (weighable instanceof Weighable)
+        {
+            return ((Weighable)weighable).weight();
+        }
+        return 1;
+    };
+
+    private final Weigher<T> weigher;
+    private final BlockingQueue<T> queue;
+    private final int maxWeight;
+    final Semaphore availableWeight;
+
+    public boolean add(T e)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean offer(T t)
+    {
+        Preconditions.checkNotNull(t);
+        boolean acquired = tryAcquireWeight(t);
+        if (acquired)
+        {
+            boolean offered = false;
+            try
+            {
+                offered = queue.offer(t);
+                return offered;
+            }
+            finally
+            {
+                if (!offered)
+                {
+                    releaseWeight(t);
+                }
+            }
+        }
+        return false;
+    }
+
+    public T remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public T poll()
+    {
+        T retval = queue.poll();
+        releaseWeight(retval);
+        return retval;
+    }
+
+    public T element()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public T peek()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void put(T t) throws InterruptedException
+    {
+        Preconditions.checkNotNull(t);
+        acquireWeight(t, 0, null);
+        boolean put = false;
+        try
+        {
+            queue.put(t);
+            put = true;
+        }
+        finally
+        {
+            if (!put)
+            {
+                releaseWeight(t);
+            }
+        }
+    }
+
+    public boolean offer(T t, long timeout, TimeUnit unit) throws 
InterruptedException
+    {
+        Preconditions.checkNotNull(t);
+        Preconditions.checkNotNull(unit);
+        boolean acquired = acquireWeight(t, timeout, unit);
+        if (acquired)
+        {
+            boolean offered = false;
+            try
+            {
+                offered = queue.offer(t, timeout, unit);
+                return offered;
+            }
+            finally
+            {
+                if (!offered)
+                {
+                    releaseWeight(t);
+                }
+            }
+        }
+        return false;
+    }
+
+    public T take() throws InterruptedException
+    {
+        T retval = queue.take();
+        releaseWeight(retval);
+        return retval;
+    }
+
+    public T poll(long timeout, TimeUnit unit) throws InterruptedException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public int remainingCapacity()
+    {
+        throw new UnsupportedOperationException("Seems like a bad idea");
+    }
+
+    public boolean remove(Object o)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean containsAll(Collection<?> c)
+    {
+        throw new UnsupportedOperationException("Seems like a bad idea");
+    }
+
+    public boolean addAll(Collection<? extends T> c)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean removeAll(Collection<?> c)
+    {
+        throw new UnsupportedOperationException("Seems like a bad idea");
+    }
+
+    public boolean retainAll(Collection<?> c)
+    {
+        throw new UnsupportedOperationException("Seems like a bad idea");
+    }
+
+    public void clear()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public int size()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean isEmpty()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean contains(Object o)
+    {
+        throw new UnsupportedOperationException("Seems like a bad idea");
+    }
+
+    public Iterator<T> iterator()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public Object[] toArray()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public <T1> T1[] toArray(T1[] a)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public int drainTo(Collection<? super T> c)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public int drainTo(Collection<? super T> c, int maxElements)
+    {
+        int count = 0;
+        T o;
+        while(count < maxElements && (o = poll()) != null)
+        {
+            c.add(o);
+            count++;
+        }
+        return count;
+    }
+
+    public interface Weigher<T>
+    {
+        int weigh(T weighable);
+    }
+
+    public interface Weighable
+    {
+        int weight();
+    }
+
+    public WeightedQueue(int maxWeight)
+    {
+        this(maxWeight, new LinkedBlockingQueue<T>(), NATURAL_WEIGHER);
+    }
+
+    public WeightedQueue(int maxWeight, BlockingQueue<T> queue, Weigher<T> 
weigher)
+    {
+        Preconditions.checkNotNull(queue);
+        Preconditions.checkNotNull(weigher);
+        Preconditions.checkArgument(maxWeight > 0);
+        this.maxWeight = maxWeight;
+        this.queue = queue;
+        this.weigher = weigher;
+        availableWeight = new Semaphore(maxWeight);
+    }
+
+    boolean acquireWeight(T weighable, long timeout, TimeUnit unit) throws 
InterruptedException
+    {
+        int weight = weigher.weigh(weighable);
+        if (weight < 1)
+        {
+            throw new IllegalArgumentException(String.format("Weighable: 
\"%s\" had illegal weight %d", Objects.toString(weighable), weight));
+        }
+
+        //Allow exactly one overweight element
+        weight = Math.min(maxWeight, weight);
+
+        if (unit != null)
+        {
+            return availableWeight.tryAcquire(weight, timeout, unit);
+        }
+        else
+        {
+            availableWeight.acquire(weight);
+            return true;
+        }
+    }
+
+    boolean tryAcquireWeight(T weighable)
+    {
+        int weight = weigher.weigh(weighable);
+        if (weight < 1)
+        {
+            throw new IllegalArgumentException(String.format("Weighable: 
\"%s\" had illegal weight %d", Objects.toString(weighable), weight));
+        }
+
+        //Allow exactly one overweight element
+        weight = Math.min(maxWeight, weight);
+
+        return availableWeight.tryAcquire(weight);
+    }
+
+    void releaseWeight(T weighable)
+    {
+        if (weighable == null)
+        {
+            return;
+        }
+
+        int weight = weigher.weigh(weighable);
+        if (weight < 1)
+        {
+            throw new IllegalArgumentException(String.format("Weighable: 
\"%s\" had illegal weight %d", Objects.toString(weighable), weight));
+        }
+
+        //Allow exactly one overweight element
+        weight = Math.min(maxWeight, weight);
+
+        availableWeight.release(weight);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java 
b/test/unit/org/apache/cassandra/Util.java
index 93115c0..7e62c41 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -551,13 +551,15 @@ public class Util
     public static void spinAssertEquals(Object expected, Supplier<Object> s, 
int timeoutInSeconds)
     {
         long start = System.currentTimeMillis();
+        Object lastValue = null;
         while (System.currentTimeMillis() < start + (1000 * timeoutInSeconds))
         {
-            if (s.get().equals(expected))
+            lastValue = s.get();
+            if (lastValue.equals(expected))
                 break;
             Thread.yield();
         }
-        assertEquals(expected, s.get());
+        assertEquals(expected, lastValue);
     }
 
     public static void joinThread(Thread thread) throws InterruptedException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/test/unit/org/apache/cassandra/db/fullquerylog/FullQueryLoggerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/fullquerylog/FullQueryLoggerTest.java 
b/test/unit/org/apache/cassandra/db/fullquerylog/FullQueryLoggerTest.java
new file mode 100644
index 0000000..175cf8c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/fullquerylog/FullQueryLoggerTest.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.fullquerylog;
+
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.cql3.QueryOptions;
+import 
org.apache.cassandra.db.fullquerylog.FullQueryLogger.WeighableMarshallableQuery;
+import 
org.apache.cassandra.db.fullquerylog.FullQueryLogger.WeighableMarshallableBatch;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.binlog.BinLogTest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FullQueryLoggerTest
+{
+    private static Path tempDir;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception
+    {
+        tempDir = BinLogTest.tempDir();
+    }
+
+    @After
+    public void tearDown()
+    {
+        FullQueryLogger.instance.reset(tempDir.toString());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testConfigureNullPath() throws Exception
+    {
+        FullQueryLogger.instance.configure(null, "", true, 1, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testConfigureNullRollCycle() throws Exception
+    {
+        FullQueryLogger.instance.configure(BinLogTest.tempDir(), null, true, 
1, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConfigureInvalidRollCycle() throws Exception
+    {
+        FullQueryLogger.instance.configure(BinLogTest.tempDir(), "foobar", 
true, 1, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConfigureInvalidMaxQueueWeight() throws Exception
+    {
+        FullQueryLogger.instance.configure(BinLogTest.tempDir(), "DAILY", 
true, 0, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConfigureInvalidMaxQueueLogSize() throws Exception
+    {
+        FullQueryLogger.instance.configure(BinLogTest.tempDir(), "DAILY", 
true, 1, 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConfigureOverExistingFile() throws Exception
+    {
+        File f = File.createTempFile("foo", "bar");
+        f.deleteOnExit();
+        FullQueryLogger.instance.configure(f.toPath(), "TEST_SECONDLY", true, 
1, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCanRead() throws Exception
+    {
+        tempDir.toFile().setReadable(false);
+        try
+        {
+            configureFQL();
+        }
+        finally
+        {
+            tempDir.toFile().setReadable(true);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCanWrite() throws Exception
+    {
+        tempDir.toFile().setWritable(false);
+        try
+        {
+            configureFQL();
+        }
+        finally
+        {
+            tempDir.toFile().setWritable(true);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCanExecute() throws Exception
+    {
+        tempDir.toFile().setExecutable(false);
+        try
+        {
+            configureFQL();
+        }
+        finally
+        {
+            tempDir.toFile().setExecutable(true);
+        }
+    }
+
+    @Test
+    public void testResetWithoutConfigure() throws Exception
+    {
+        FullQueryLogger.instance.reset(tempDir.toString());
+        FullQueryLogger.instance.reset(tempDir.toString());
+    }
+
+    @Test
+    public void stopWithoutConfigure() throws Exception
+    {
+        FullQueryLogger.instance.stop();
+        FullQueryLogger.instance.stop();
+    }
+
+    /**
+     * Both the last used and supplied directory should get cleaned
+     */
+    @Test
+    public void testResetCleansPaths() throws Exception
+    {
+        configureFQL();
+        File tempA = File.createTempFile("foo", "bar", tempDir.toFile());
+        assertTrue(tempA.exists());
+        File tempB = File.createTempFile("foo", "bar", 
BinLogTest.tempDir().toFile());
+        FullQueryLogger.instance.reset(tempB.getParent());
+        assertFalse(tempA.exists());
+        assertFalse(tempB.exists());
+    }
+
+    /**
+     * The last used and configured directory are the same and it shouldn't be 
an issue
+     */
+    @Test
+    public void testResetSamePath() throws Exception
+    {
+        configureFQL();
+        File tempA = File.createTempFile("foo", "bar", tempDir.toFile());
+        assertTrue(tempA.exists());
+        FullQueryLogger.instance.reset(tempA.getParent());
+        assertFalse(tempA.exists());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testDoubleConfigure() throws Exception
+    {
+        configureFQL();
+        configureFQL();
+    }
+
+    @Test
+    public void testCleansDirectory() throws Exception
+    {
+        assertTrue(new File(tempDir.toFile(), "foobar").createNewFile());
+        configureFQL();
+        assertEquals(tempDir.toFile().listFiles().length, 1);
+        assertEquals("directory-listing.cq4t", 
tempDir.toFile().listFiles()[0].getName());
+    }
+
+    @Test
+    public void testEnabledReset() throws Exception
+    {
+        assertFalse(FullQueryLogger.instance.enabled());
+        configureFQL();
+        assertTrue(FullQueryLogger.instance.enabled());
+        FullQueryLogger.instance.reset(tempDir.toString());
+        assertFalse(FullQueryLogger.instance.enabled());
+    }
+
+    @Test
+    public void testEnabledStop() throws Exception
+    {
+        assertFalse(FullQueryLogger.instance.enabled());
+        configureFQL();
+        assertTrue(FullQueryLogger.instance.enabled());
+        FullQueryLogger.instance.stop();
+        assertFalse(FullQueryLogger.instance.enabled());
+    }
+
+    /**
+     * Test that a thread will block if the FQL is over weight, and unblock 
once the backup is cleared
+     */
+    @Test
+    public void testBlocking() throws Exception
+    {
+        configureFQL();
+        //Prevent the bin log thread from making progress, causing the task 
queue to block
+        Semaphore blockBinLog = new Semaphore(0);
+        try
+        {
+            //Find out when the bin log thread has been blocked, necessary to 
not run into batch task drain behavior
+            Semaphore binLogBlocked = new Semaphore(0);
+            FullQueryLogger.instance.binLog.put(new 
WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1)
+            {
+
+                public void writeMarshallable(WireOut wire)
+                {
+                    //Notify that the bin log is blocking now
+                    binLogBlocked.release();
+                    try
+                    {
+                        //Block the bin log thread so the task queue can be 
filled
+                        blockBinLog.acquire();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        e.printStackTrace();
+                    }
+                    super.writeMarshallable(wire);
+                }
+
+                public void release()
+                {
+                    super.release();
+                }
+            });
+
+            //Wait for the bin log thread to block so it can't batch drain 
tasks
+            Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
+
+            //Now fill the task queue
+            logQuery("foo2");
+
+            //Start a thread to block waiting on the bin log queue
+            Thread t = new Thread(() ->
+                                  {
+                                      logQuery("foo3");
+                                      //Should be able to log another query 
without an issue
+                                      logQuery("foo4");
+                                  });
+            t.start();
+            Thread.sleep(500);
+            //If thread state is terminated then the thread started, finished, 
and didn't block on the full task queue
+            assertTrue(t.getState() != Thread.State.TERMINATED);
+        }
+        finally
+        {
+            //Unblock the binlog thread
+            blockBinLog.release();
+        }
+        Util.spinAssertEquals(true, () -> 
checkForQueries(Arrays.asList("foo1", "foo2", "foo3", "foo4")), 60);
+    }
+
+    private boolean checkForQueries(List<String> queries)
+    {
+        try (ChronicleQueue queue = 
ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptTailer tailer = queue.createTailer();
+            List<String> expectedQueries = new LinkedList<>(queries);
+            while (!expectedQueries.isEmpty())
+            {
+                if (!tailer.readDocument(wire -> {
+                    assertEquals(expectedQueries.get(0), 
wire.read("query").text());
+                    expectedQueries.remove(0);
+                }))
+                {
+                    return false;
+                }
+            }
+            assertFalse(tailer.readDocument(wire -> {}));
+            return true;
+        }
+    }
+
+    @Test
+    public void testNonBlocking() throws Exception
+    {
+        FullQueryLogger.instance.configure(tempDir, "TEST_SECONDLY", false, 1, 
1024 * 1024 * 256);
+        //Prevent the bin log thread from making progress, causing the task 
queue to refuse tasks
+        Semaphore blockBinLog = new Semaphore(0);
+        try
+        {
+            //Find out when the bin log thread has been blocked, necessary to 
not run into batch task drain behavior
+            Semaphore binLogBlocked = new Semaphore(0);
+            FullQueryLogger.instance.binLog.put(new 
WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1)
+            {
+
+                public void writeMarshallable(WireOut wire)
+                {
+                    //Notify that the bin log is blocking now
+                    binLogBlocked.release();
+                    try
+                    {
+                        //Block the bin log thread so the task queue can be 
filled
+                        blockBinLog.acquire();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        e.printStackTrace();
+                    }
+                    super.writeMarshallable(wire);
+                }
+
+                public void release()
+                {
+                    super.release();
+                }
+            });
+
+            //Wait for the bin log thread to block so it can't batch drain 
tasks
+            Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
+
+            //Now fill the task queue
+            logQuery("foo2");
+
+            //This sample should get dropped AKA released without being written
+            AtomicInteger releasedCount = new AtomicInteger(0);
+            AtomicInteger writtenCount = new AtomicInteger(0);
+            FullQueryLogger.instance.logRecord(new 
WeighableMarshallableQuery("foo3", QueryOptions.DEFAULT, 1) {
+                public void writeMarshallable(WireOut wire)
+                {
+                    writtenCount.incrementAndGet();
+                    super.writeMarshallable(wire);
+                }
+
+                public void release()
+                {
+                    releasedCount.incrementAndGet();
+                    super.release();
+                }
+            }, FullQueryLogger.instance.binLog);
+
+            Util.spinAssertEquals(1, releasedCount::get, 60);
+            assertEquals(0, writtenCount.get());
+        }
+        finally
+        {
+            blockBinLog.release();
+        }
+        //Wait for tasks to drain so there should be space in the queue
+        Util.spinAssertEquals(true, () -> 
checkForQueries(Arrays.asList("foo1", "foo2")), 60);
+        //Should be able to log again
+        logQuery("foo4");
+        Util.spinAssertEquals(true, () -> 
checkForQueries(Arrays.asList("foo1", "foo2", "foo4")), 60);
+    }
+
+    @Test
+    public void testRoundTripQuery() throws Exception
+    {
+        configureFQL();
+        logQuery("foo");
+        Util.spinAssertEquals(true, () -> 
checkForQueries(Arrays.asList("foo")), 60);
+        try (ChronicleQueue queue = 
ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptTailer tailer = queue.createTailer();
+            assertTrue(tailer.readDocument(wire -> {
+                assertEquals("single", wire.read("type").text());
+                ProtocolVersion protocolVersion = 
ProtocolVersion.decode(wire.read("protocol-version").int32());
+                assertEquals(ProtocolVersion.CURRENT, protocolVersion);
+                QueryOptions queryOptions = 
QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()),
 protocolVersion);
+                compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
+                assertEquals(1L, wire.read("query-time").int64());
+                assertEquals("foo", wire.read("query").text());
+            }));
+        }
+    }
+
+    @Test
+    public void testRoundTripBatch() throws Exception
+    {
+        configureFQL();
+        FullQueryLogger.instance.logBatch("UNLOGGED", Arrays.asList("foo1", 
"foo2"), Arrays.asList(Arrays.asList(ByteBuffer.allocate(1) , 
ByteBuffer.allocateDirect(2)), Arrays.asList()), QueryOptions.DEFAULT, 1);
+        Util.spinAssertEquals(true, () ->
+        {
+            try (ChronicleQueue queue = 
ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+            {
+                return queue.createTailer().readingDocument().isPresent();
+            }
+        }, 60);
+        try (ChronicleQueue queue = 
ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptTailer tailer = queue.createTailer();
+            assertTrue(tailer.readDocument(wire -> {
+                assertEquals("batch", wire.read("type").text());
+                ProtocolVersion protocolVersion = 
ProtocolVersion.decode(wire.read("protocol-version").int32());
+                assertEquals(ProtocolVersion.CURRENT, protocolVersion);
+                QueryOptions queryOptions = 
QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()),
 protocolVersion);
+                assertEquals(1L, wire.read("query-time").int64());
+                compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
+                assertEquals("UNLOGGED", wire.read("batch-type").text());
+                ValueIn in = wire.read("queries");
+                assertEquals(2, in.int32());
+                assertEquals("foo1", in.text());
+                assertEquals("foo2", in.text());
+                in = wire.read("values");
+                assertEquals(2, in.int32());
+                assertEquals(2, in.int32());
+                assertTrue(Arrays.equals(new byte[1], in.bytes()));
+                assertTrue(Arrays.equals(new byte[2], in.bytes()));
+                assertEquals(0, in.int32());
+            }));
+        }
+    }
+
+    @Test
+    public void testQueryWeight()
+    {
+        //Empty query should have some weight
+        WeighableMarshallableQuery query = new WeighableMarshallableQuery("", 
QueryOptions.DEFAULT, 1);
+        assertTrue(query.weight() >= 95);
+
+        StringBuilder sb = new StringBuilder();
+        for (int ii = 0; ii < 1024 * 1024; ii++)
+        {
+            sb.append('a');
+        }
+        query = new WeighableMarshallableQuery(sb.toString(), 
QueryOptions.DEFAULT, 1);
+
+        //A large query should be reflected in the size, * 2 since characters 
are still two bytes
+        assertTrue(query.weight() > ObjectSizes.measureDeep(sb.toString()));
+
+        //Large query options should be reflected
+        QueryOptions largeOptions = 
QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
+        query = new WeighableMarshallableQuery("", largeOptions, 1);
+        assertTrue(query.weight() > 1024 * 1024);
+        System.out.printf("weight %d%n", query.weight());
+    }
+
+    @Test
+    public void testBatchWeight()
+    {
+        //An empty batch should have weight
+        WeighableMarshallableBatch batch = new WeighableMarshallableBatch("", 
new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() >= 183);
+
+        StringBuilder sb = new StringBuilder();
+        for (int ii = 0; ii < 1024 * 1024; ii++)
+        {
+            sb.append('a');
+        }
+
+        //The weight of the type string should be reflected
+        batch = new WeighableMarshallableBatch(sb.toString(), new 
ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() > ObjectSizes.measureDeep(sb.toString()));
+
+        //The weight of the list containing queries should be reflected
+        List<String> bigList = new ArrayList(100000);
+        for (int ii = 0; ii < 100000; ii++)
+        {
+            bigList.add("");
+        }
+        batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), 
QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
+
+        //The size of the query should be reflected
+        bigList = new ArrayList(1);
+        bigList.add(sb.toString());
+        batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), 
QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
+
+        bigList = null;
+        //The size of the list of values should be reflected
+        List<List<ByteBuffer>> bigValues = new ArrayList<>(100000);
+        for (int ii = 0; ii < 100000; ii++)
+        {
+            bigValues.add(new ArrayList<>(0));
+        }
+        bigValues.get(0).add(ByteBuffer.allocate(1024 * 1024 * 5));
+        batch = new WeighableMarshallableBatch("", new ArrayList<>(),  
bigValues, QueryOptions.DEFAULT, 1);
+        assertTrue(batch.weight() > ObjectSizes.measureDeep(bigValues));
+
+        //As should the size of the values
+        QueryOptions largeOptions = 
QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
+        batch = new WeighableMarshallableBatch("", new ArrayList<>(), new 
ArrayList<>(), largeOptions, 1);
+        assertTrue(batch.weight() > 1024 * 1024);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullType() throws Exception
+    {
+        FullQueryLogger.instance.logBatch(null, new ArrayList<>(), new 
ArrayList<>(), QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullQueries() throws Exception
+    {
+        FullQueryLogger.instance.logBatch("", null, new ArrayList<>(), 
QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullQueriesQuery() throws Exception
+    {
+        configureFQL();
+        FullQueryLogger.instance.logBatch("", Arrays.asList((String)null), new 
ArrayList<>(), QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullValues() throws Exception
+    {
+        FullQueryLogger.instance.logBatch("", new ArrayList<>(), null, 
QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullValuesValue() throws Exception
+    {
+        FullQueryLogger.instance.logBatch("", new ArrayList<>(), 
Arrays.asList((List<ByteBuffer>)null), null, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogBatchNullQueryOptions() throws Exception
+    {
+        FullQueryLogger.instance.logBatch("", new ArrayList<>(), new 
ArrayList<>(), null, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testLogBatchNegativeTime() throws Exception
+    {
+        FullQueryLogger.instance.logBatch("", new ArrayList<>(), new 
ArrayList<>(), QueryOptions.DEFAULT, -1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogQueryNullQuery() throws Exception
+    {
+        FullQueryLogger.instance.logQuery(null, QueryOptions.DEFAULT, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testLogQueryNullQueryOptions() throws Exception
+    {
+        FullQueryLogger.instance.logQuery("", null, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testLogQueryNegativeTime() throws Exception
+    {
+        FullQueryLogger.instance.logQuery("", QueryOptions.DEFAULT, -1);
+    }
+
+    private static void compareQueryOptions(QueryOptions a, QueryOptions b)
+    {
+        assertEquals(a.getClass(), b.getClass());
+        assertEquals(a.getProtocolVersion(), b.getProtocolVersion());
+        assertEquals(a.getPageSize(), b.getPageSize());
+        assertEquals(a.getConsistency(), b.getConsistency());
+        assertEquals(a.getPagingState(), b.getPagingState());
+        assertEquals(a.getValues(), b.getValues());
+        assertEquals(a.getSerialConsistency(), b.getSerialConsistency());
+    }
+
+    private void configureFQL() throws Exception
+    {
+        FullQueryLogger.instance.configure(tempDir, "TEST_SECONDLY", true, 1, 
1024 * 1024 * 256);
+    }
+
+    private void logQuery(String query)
+    {
+        FullQueryLogger.instance.logQuery(query, QueryOptions.DEFAULT, 1);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java 
b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
new file mode 100644
index 0000000..204e27b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.binlog;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptTailer;
+import net.openhft.chronicle.queue.RollCycles;
+import net.openhft.chronicle.wire.WireOut;
+import org.apache.cassandra.Util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class BinLogTest
+{
+    public static Path tempDir() throws Exception
+    {
+        File f = File.createTempFile("foo", "bar");
+        f.delete();
+        f.mkdir();
+        return Paths.get(f.getPath());
+    }
+
+    private static final String testString = "ry@nlikestheyankees";
+    private static final String testString2 = testString + "1";
+
+    private BinLog binLog;
+    private Path path;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        path = tempDir();
+        binLog = new BinLog(path, RollCycles.TEST_SECONDLY, 10, 1024 * 1024 * 
128);
+        binLog.start();
+    }
+
+    @After
+    public void tearDown() throws Exception
+    {
+        if (binLog != null)
+        {
+            binLog.stop();
+        }
+        for (File f : path.toFile().listFiles())
+        {
+            f.delete();
+        }
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testConstructorNullPath() throws Exception
+    {
+        new BinLog(null, RollCycles.TEST_SECONDLY, 1, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testConstructorNullRollCycle() throws Exception
+    {
+        new BinLog(tempDir(), null, 1, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConstructorZeroWeight() throws Exception
+    {
+        new BinLog(tempDir(), RollCycles.TEST_SECONDLY, 0, 1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConstructorLogSize() throws Exception
+    {
+        new BinLog(tempDir(), RollCycles.TEST_SECONDLY, 1, 0);
+    }
+
+    /**
+     * Check that we can start and stop the bin log and that it releases 
resources held by any subsequent appended
+     * records
+     */
+    @Test
+    public void testBinLogStartStop() throws Exception
+    {
+        Semaphore blockBinLog = new Semaphore(1);
+        AtomicInteger releaseCount = new AtomicInteger();
+        binLog.put(new BinLog.ReleaseableWriteMarshallable()
+        {
+            protected void release()
+            {
+                releaseCount.incrementAndGet();
+            }
+
+            public void writeMarshallable(WireOut wire)
+            {
+                try
+                {
+                    blockBinLog.acquire();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+        binLog.put(new BinLog.ReleaseableWriteMarshallable()
+        {
+
+            public void writeMarshallable(WireOut wire)
+            {
+
+            }
+
+            protected void release()
+            {
+                releaseCount.incrementAndGet();
+            }
+        });
+        Thread.sleep(1000);
+        assertEquals(2, releaseCount.get());
+        Thread t = new Thread(() -> {
+            try
+            {
+                binLog.stop();
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+        });
+        t.start();
+        t.join(60 * 1000);
+        assertEquals(t.getState(), Thread.State.TERMINATED);
+
+        Util.spinAssertEquals(2, releaseCount::get, 60);
+        Util.spinAssertEquals(Thread.State.TERMINATED, 
binLog.binLogThread::getState, 60);
+     }
+
+    /**
+     * Check that the finalizer releases any stragglers in the queue
+     */
+    @Test
+    public void testBinLogFinalizer() throws Exception
+    {
+        binLog.stop();
+        Semaphore released = new Semaphore(0);
+        binLog.sampleQueue.put(new BinLog.ReleaseableWriteMarshallable()
+        {
+            protected void release()
+            {
+                released.release();
+            }
+
+            public void writeMarshallable(WireOut wire)
+            {
+
+            }
+        });
+        binLog = null;
+
+        for (int ii = 0; ii < 30; ii++)
+        {
+            System.gc();
+            System.runFinalization();
+            Thread.sleep(100);
+            if (released.tryAcquire())
+                return;
+        }
+        fail("Finalizer never released resources");
+    }
+
+    /**
+     * Test that put blocks and unblocks and creates records
+     */
+    @Test
+    public void testPut() throws Exception
+    {
+        binLog.put(record(testString));
+        binLog.put(record(testString2));
+
+        Util.spinAssertEquals(2, () -> readBinLogRecords(path).size(), 60);
+        List<String> records = readBinLogRecords(path);
+        assertEquals(testString, records.get(0));
+        assertEquals(testString2, records.get(1));
+
+
+        //Prevent the bin log thread from making progress
+        Semaphore blockBinLog = new Semaphore(0);
+        //Get notified when the bin log thread has blocked and definitely 
won't batch drain tasks
+        Semaphore binLogBlocked = new Semaphore(0);
+        try
+        {
+            binLog.put(new BinLog.ReleaseableWriteMarshallable()
+            {
+                protected void release()
+                {
+                }
+
+                public void writeMarshallable(WireOut wire)
+                {
+                    //Notify the bing log thread is about to block
+                    binLogBlocked.release();
+                    try
+                    {
+                        //Block the bin log thread so it doesn't process more 
tasks
+                        blockBinLog.acquire();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+
+            //Wait for the bin log thread to block so it doesn't batch drain
+            Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
+
+            //Now fill the queue up to capacity and it shouldn't block
+            for (int ii = 0; ii < 10; ii++)
+            {
+                binLog.put(record(testString));
+            }
+
+            //Thread to block on the full queue
+            Thread t = new Thread(() ->
+                                  {
+                                      try
+                                      {
+                                          binLog.put(record(testString));
+                                          //Should be able to do it again 
after unblocking
+                                          binLog.put(record(testString));
+                                      }
+                                      catch (InterruptedException e)
+                                      {
+                                          throw new AssertionError(e);
+                                      }
+                                  });
+            t.start();
+            Thread.sleep(500);
+            //If the thread is not terminated then it is probably blocked on 
the queue
+            assertTrue(t.getState() != Thread.State.TERMINATED);
+        }
+        finally
+        {
+            blockBinLog.release();
+        }
+
+        //Expect all the records to eventually be there including one from the 
blocked thread
+        Util.spinAssertEquals(15, () -> readBinLogRecords(path).size(), 60);
+    }
+
+    @Test
+    public void testOffer() throws Exception
+    {
+        assertTrue(binLog.offer(record(testString)));
+        assertTrue(binLog.offer(record(testString2)));
+
+        Util.spinAssertEquals(2, () -> readBinLogRecords(path).size(), 60);
+        List<String> records = readBinLogRecords(path);
+        assertEquals(testString, records.get(0));
+        assertEquals(testString2, records.get(1));
+
+        //Prevent the bin log thread from making progress
+        Semaphore blockBinLog = new Semaphore(0);
+        //Get notified when the bin log thread has blocked and definitely 
won't batch drain tasks
+        Semaphore binLogBlocked = new Semaphore(0);
+        try
+        {
+            assertTrue(binLog.offer(new BinLog.ReleaseableWriteMarshallable()
+            {
+                protected void release()
+                {
+                }
+
+                public void writeMarshallable(WireOut wire)
+                {
+                    //Notify the bing log thread is about to block
+                    binLogBlocked.release();
+                    try
+                    {
+                        //Block the bin log thread so it doesn't process more 
tasks
+                        blockBinLog.acquire();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }));
+
+            //Wait for the bin log thread to block so it doesn't batch drain
+            Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60);
+
+            //Now fill the queue up to capacity and it should always accept
+            for (int ii = 0; ii < 10; ii++)
+            {
+                assertTrue(binLog.offer(record(testString)));
+            }
+
+            //it shoudl reject this record since it is full
+            assertFalse(binLog.offer(record(testString)));
+        }
+        finally
+        {
+            blockBinLog.release();
+        }
+        Util.spinAssertEquals(13, () -> readBinLogRecords(path).size(), 60);
+        assertTrue(binLog.offer(record(testString)));
+        Util.spinAssertEquals(14, () -> readBinLogRecords(path).size(), 60);
+    }
+
+    /**
+     * Set a very small segment size so on rolling the segments are always 
deleted
+     */
+    @Test
+    public void testCleanupOnOversize() throws Exception
+    {
+        tearDown();
+        binLog = new BinLog(path, RollCycles.TEST_SECONDLY, 10000, 1);
+        binLog.start();
+        for (int ii = 0; ii < 5; ii++)
+        {
+            binLog.put(record(String.valueOf(ii)));
+            Thread.sleep(1001);
+        }
+        List<String> records = readBinLogRecords(path);
+        System.out.println("Records found are " + records);
+        assertTrue(records.size() < 5);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testNoReuse() throws Exception
+    {
+        binLog.stop();
+        binLog.start();
+    }
+
+    @Test
+    public void testOfferAfterStop() throws Exception
+    {
+        binLog.stop();
+        assertFalse(binLog.offer(record(testString)));
+    }
+
+    @Test
+    public void testPutAfterStop() throws Exception
+    {
+        binLog.stop();
+        binLog.put(record(testString));
+        assertEquals(null, binLog.sampleQueue.poll());
+    }
+
+    /**
+     * Test for a bug where files were deleted but the space was not reclaimed 
when tracking so
+     * all log segemnts were incorrectly deleted when rolled.
+     */
+    @Test
+    public void testTrucationReleasesLogSpace() throws Exception
+    {
+        StringBuilder sb = new StringBuilder();
+        for (int ii = 0; ii < 1024 * 1024 * 2; ii++)
+        {
+            sb.append('a');
+        }
+
+        String queryString = sb.toString();
+
+        //This should fill up the log so when it rolls in the future it will 
always delete the rolled segment;
+        for (int ii = 0; ii < 129; ii++)
+        {
+            binLog.put(record(queryString));
+        }
+
+        for (int ii = 0; ii < 2; ii++)
+        {
+            Thread.sleep(2000);
+            binLog.put(record(queryString));
+        }
+
+        Util.spinAssertEquals(2, () -> readBinLogRecords(path).size(), 60);
+    }
+
+    static BinLog.ReleaseableWriteMarshallable record(String text)
+    {
+        return new BinLog.ReleaseableWriteMarshallable()
+        {
+            protected void release()
+            {
+                //Do nothing
+            }
+
+            public void writeMarshallable(WireOut wire)
+            {
+                wire.write("text").text(text);
+            }
+        };
+    }
+
+    List<String> readBinLogRecords(Path path)
+    {
+        List<String> records = new ArrayList<String>();
+        try (ChronicleQueue queue = 
ChronicleQueueBuilder.single(path.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
+        {
+            ExcerptTailer tailer = queue.createTailer();
+            while (true)
+            {
+                if (!tailer.readDocument(wire ->
+                                         {
+                                             
records.add(wire.read("text").text());
+                                         }))
+                {
+                    return records;
+                }
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/test/unit/org/apache/cassandra/utils/concurrent/WeightedQueueTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/utils/concurrent/WeightedQueueTest.java 
b/test/unit/org/apache/cassandra/utils/concurrent/WeightedQueueTest.java
new file mode 100644
index 0000000..544e95c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/concurrent/WeightedQueueTest.java
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class WeightedQueueTest
+{
+    private static WeightedQueue<Object> queue()
+    {
+        return new WeightedQueue<>(10);
+    }
+
+    private WeightedQueue<Object> queue;
+
+    @Before
+    public void setUp()
+    {
+        queue = queue();
+    }
+
+    private static WeightedQueue.Weighable weighable(int weight)
+    {
+        return () -> weight;
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testAddUnsupported() throws Exception
+    {
+        queue.add(new Object());
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testRemoveUnsupported() throws Exception
+    {
+        queue.remove();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testElementUnsupported() throws Exception
+    {
+        queue.element();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testPeekUnsupported() throws Exception
+    {
+        queue.peek();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testRemainingCapacityUnsupported() throws Exception
+    {
+        queue.remainingCapacity();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testRemoveElementUnsupported() throws Exception
+    {
+        queue.remove(null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testContainsAllUnsupported() throws Exception
+    {
+        queue.containsAll(null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testAddAllUnsupported() throws Exception
+    {
+        queue.addAll(null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testRemoveAllUnsupported() throws Exception
+    {
+        queue.removeAll(null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testRetainAllUnsupported() throws Exception
+    {
+        queue.retainAll(null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testClearUnsupported() throws Exception
+    {
+        queue.clear();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testSizeUnsupported() throws Exception
+    {
+        queue.size();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testIsEmptyUnsupported() throws Exception
+    {
+        queue.isEmpty();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testContainsUnsupported() throws Exception
+    {
+        queue.contains(null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testIteratorUnsupported() throws Exception
+    {
+        queue.iterator();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testToArrayUnsupported() throws Exception
+    {
+        queue.toArray();
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testToArray2Unsupported() throws Exception
+    {
+        queue.toArray( new Object[] {});
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testDrainToUnsupported() throws Exception
+    {
+        queue.drainTo(new ArrayList<>());
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testTimedPollUnsupported() throws Exception
+    {
+        queue.poll(1, TimeUnit.MICROSECONDS);
+    }
+
+    @Test
+    public void testDrainToWithLimit() throws Exception
+    {
+        queue.offer(new Object());
+        queue.offer(new Object());
+        queue.offer(new Object());
+        ArrayList<Object> list = new ArrayList<>();
+        queue.drainTo(list, 1);
+        assertEquals(1, list.size());
+        list.clear();
+        queue.drainTo(list, 10);
+        assertEquals(2, list.size());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void offerNullThrows() throws Exception
+    {
+        queue.offer(null);
+    }
+
+    /**
+     * This also tests that natural weight (weighable interface) is respected
+     */
+    @Test
+    public void offerFullFails() throws Exception
+    {
+        assertTrue(queue.offer(weighable(10)));
+        assertFalse(queue.offer(weighable(1)));
+    }
+
+    /**
+     * Validate permits aren't leaked and return values are correct
+     */
+    @Test
+    public void testOfferWrappedQueueRefuses() throws Exception
+    {
+        queue = new WeightedQueue<>(10, new BadQueue(true), 
WeightedQueue.NATURAL_WEIGHER);
+        assertEquals(10, queue.availableWeight.availablePermits());
+        assertFalse(queue.offer(new Object()));
+        assertEquals(10, queue.availableWeight.availablePermits());
+    }
+
+    /**
+     * Validate permits aren't leaked and return values are correct
+     */
+    @Test
+    public void testOfferWrappedQueueThrows() throws Exception
+    {
+        queue = new WeightedQueue<>(10, new BadQueue(false), 
WeightedQueue.NATURAL_WEIGHER);
+        assertEquals(10, queue.availableWeight.availablePermits());
+        try
+        {
+            assertFalse(queue.offer(new Object()));
+            fail();
+        }
+        catch (UnsupportedOperationException e)
+        {
+            //expected and desired
+        }
+        assertEquals(10, queue.availableWeight.availablePermits());
+    }
+
+    /**
+     * If not weighable and not custom weigher the default weight is 1
+     */
+    @Test
+    public void defaultWeightRespected() throws Exception
+    {
+        for (int ii = 0; ii < 10; ii++)
+        {
+            assertTrue(queue.offer(new Object()));
+        }
+        assertFalse(queue.offer(new Object()));
+    }
+
+    @Test
+    public void testCustomWeigher() throws Exception
+    {
+        queue = new WeightedQueue<>(10, new LinkedBlockingQueue<>(), weighable 
-> 10 );
+        assertTrue(queue.offer(new Object()));
+        assertFalse(queue.offer(new Object()));
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testCustomQueue() throws Exception
+    {
+        new WeightedQueue<>(10, new BadQueue(false), 
WeightedQueue.NATURAL_WEIGHER).offer(new Object());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void timedOfferNullValueThrows() throws Exception
+    {
+        queue.offer(null, 1, TimeUnit.SECONDS);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void timedOfferNullTimeThrows() throws Exception
+    {
+        queue.offer(null, 1, null);
+    }
+
+    /**
+     * This is how it seems to be handled in java.util.concurrent, it's the 
same as just try
+     */
+    @Test
+    public void timedOfferNegativeTimeIgnored() throws Exception
+    {
+        queue.offer(weighable(10));
+        queue.offer(new Object(), -1, TimeUnit.SECONDS);
+    }
+
+    /**
+     * This also tests that natural weight (weighable interface) is respected
+     */
+    @Test
+    public void timedOfferFullFails() throws Exception
+    {
+        assertTrue(queue.offer(weighable(10), 1, TimeUnit.MICROSECONDS));
+        assertFalse(queue.offer(weighable(1), 1, TimeUnit.MICROSECONDS));
+    }
+
+    @Test
+    public void timedOfferEventuallySucceeds() throws Exception
+    {
+        assertTrue(queue.offer(weighable(10), 1, TimeUnit.MICROSECONDS));
+        Thread t = new Thread(() ->
+          {
+              try
+              {
+                  queue.offer(weighable(1), 1, TimeUnit.DAYS);
+              }
+              catch (InterruptedException e)
+              {
+                  e.printStackTrace();
+              }
+          });
+        t.start();
+        Thread.sleep(100);
+        assertTrue(t.getState() != Thread.State.TERMINATED);
+        queue.poll();
+        t.join(60000);
+        assertEquals(t.getState(), Thread.State.TERMINATED);
+    }
+
+    /**
+     * Validate permits aren't leaked and return values are correct
+     */
+    @Test
+    public void testTimedOfferWrappedQueueRefuses() throws Exception
+    {
+        queue = new WeightedQueue<>(10, new BadQueue(true), 
WeightedQueue.NATURAL_WEIGHER);
+        assertEquals(10, queue.availableWeight.availablePermits());
+        assertFalse(queue.offer(new Object(), 1, TimeUnit.MICROSECONDS));
+        assertEquals(10, queue.availableWeight.availablePermits());
+    }
+
+    /**
+     * Validate permits aren't leaked and return values are correct
+     */
+    @Test
+    public void testTimedOfferWrappedQueueThrows() throws Exception
+    {
+        queue = new WeightedQueue<>(10, new BadQueue(false), 
WeightedQueue.NATURAL_WEIGHER);
+        assertEquals(10, queue.availableWeight.availablePermits());
+        try
+        {
+            assertFalse(queue.offer(new Object(), 1, TimeUnit.MICROSECONDS));
+            fail();
+        }
+        catch (UnsupportedOperationException e)
+        {
+            //expected and desired
+        }
+        assertEquals(10, queue.availableWeight.availablePermits());
+    }
+
+
+    @Test
+    public void testPoll() throws Exception
+    {
+        assertEquals(10, queue.availableWeight.availablePermits());
+        assertNull(queue.poll());
+        assertEquals(10, queue.availableWeight.availablePermits());
+        Object o = new Object();
+        assertTrue(queue.offer(o));
+        assertEquals(9, queue.availableWeight.availablePermits());
+        WeightedQueue.Weighable weighable = weighable(9);
+        assertTrue(queue.offer(weighable));
+        assertEquals(0, queue.availableWeight.availablePermits());
+        assertEquals(o, queue.poll());
+        assertEquals(1, queue.availableWeight.availablePermits());
+        assertEquals(weighable, queue.poll());
+        assertEquals(10, queue.availableWeight.availablePermits());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testPutNullThrows() throws Exception
+    {
+        queue.put(null);
+    }
+
+    @Test
+    public void testPutFullBlocks() throws Exception
+    {
+        WeightedQueue.Weighable weighable = weighable(10);
+        assertEquals(10, queue.availableWeight.availablePermits());
+        queue.put(weighable);
+        assertEquals(0, queue.availableWeight.availablePermits());
+        Object o = new Object();
+        Thread t = new Thread(() -> {
+            try
+            {
+                queue.put(o);
+            } catch (InterruptedException e)
+            {
+                e.printStackTrace();
+            }
+        });
+        t.start();
+        Thread.sleep(100);
+        assertTrue(t.getState() != Thread.State.TERMINATED);
+        assertEquals(0, queue.availableWeight.availablePermits());
+        assertEquals(weighable, queue.poll());
+        assertTrue(queue.availableWeight.availablePermits() > 0);
+        t.join();
+        assertEquals(o, queue.poll());
+        assertEquals(10, queue.availableWeight.availablePermits());
+    }
+
+    @Test
+    public void testPutWrappedQueueThrows() throws Exception
+    {
+        queue = new WeightedQueue<>(10, new BadQueue(false), 
WeightedQueue.NATURAL_WEIGHER);
+        assertEquals(10, queue.availableWeight.availablePermits());
+        try
+        {
+            queue.put(new Object());
+            fail();
+        }
+        catch (UnsupportedOperationException e)
+        {
+            //expected and desired
+        }
+        assertEquals(10, queue.availableWeight.availablePermits());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTryAcquireWeightIllegalWeight()
+    {
+        queue.tryAcquireWeight(weighable(-1));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAcquireWeightIllegalWeight() throws Exception
+    {
+        queue.acquireWeight(weighable(-1), 1, TimeUnit.DAYS);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testReleaseWeightIllegalWeight()
+    {
+        queue.releaseWeight(weighable(-1));
+    }
+
+    @Test
+    public void testTake() throws Exception
+    {
+        Thread t = new Thread(() -> {
+            try
+            {
+                queue.take();
+            }
+            catch (InterruptedException e)
+            {
+                e.printStackTrace();
+            }
+        });
+        t.start();
+        Thread.sleep(500);
+        assertTrue(t.getState() != Thread.State.TERMINATED);
+        assertEquals(10, queue.availableWeight.availablePermits());
+        queue.offer(new Object());
+        t.join(60 * 1000);
+        assertEquals(t.getState(), Thread.State.TERMINATED);
+        assertEquals(10, queue.availableWeight.availablePermits());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConstructorLTZeroWeightThrows() throws Exception
+    {
+        new WeightedQueue(0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConstructor2LTZeroWeightThrows() throws Exception
+    {
+        new WeightedQueue(0, new LinkedBlockingQueue<>(), 
WeightedQueue.NATURAL_WEIGHER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testConstructorNullQueueThrows() throws Exception
+    {
+        new WeightedQueue(1, null, WeightedQueue.NATURAL_WEIGHER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testConstructorNullWeigherThrows() throws Exception
+    {
+        new WeightedQueue(1, new LinkedBlockingQueue<>(), null);
+    }
+
+    /**
+     * A blocking queue that throws or refuses on every method
+     */
+    private static class BadQueue implements BlockingQueue<Object>
+    {
+        /**
+         * Refuse instead of throwing for some methods that have a boolean 
return value
+         */
+        private boolean refuse = false;
+
+        private BadQueue(boolean refuse)
+        {
+            this.refuse = refuse;
+        }
+
+        @Override
+        public boolean add(Object o)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean offer(Object o)
+        {
+            if (refuse)
+            {
+                return false;
+            }
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object poll()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object element()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object peek()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void put(Object o) throws InterruptedException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean offer(Object o, long timeout, TimeUnit unit) throws 
InterruptedException
+        {
+            if (refuse)
+            {
+                return false;
+            }
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object take() throws InterruptedException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object poll(long timeout, TimeUnit unit) throws 
InterruptedException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int remainingCapacity()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean remove(Object o)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean addAll(Collection c)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void clear()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean retainAll(Collection c)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean removeAll(Collection c)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean containsAll(Collection c)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int size()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isEmpty()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean contains(Object o)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Iterator iterator()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object[] toArray()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object[] toArray(Object[] a)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int drainTo(Collection c)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int drainTo(Collection c, int maxElements)
+        {
+            throw new UnsupportedOperationException();
+        }
+    };
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to