Repository: cassandra Updated Branches: refs/heads/cassandra-13983-rebased2 [created] ae837806b
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/src/java/org/apache/cassandra/utils/concurrent/WeightedQueue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/WeightedQueue.java b/src/java/org/apache/cassandra/utils/concurrent/WeightedQueue.java new file mode 100644 index 0000000..3a6505e --- /dev/null +++ b/src/java/org/apache/cassandra/utils/concurrent/WeightedQueue.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.concurrent; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Weighted queue is a wrapper around any blocking queue that turns it into a blocking weighted queue. The queue + * will weigh each element being added and removed. Adding to the queue is blocked if adding would violate + * the weight bound. + * + * If an element weighs in at larger than the capacity of the queue then exactly one such element will be allowed + * into the queue at a time. + * + * If the weight of an object changes after it is added you are going to have a bad time. Checking weight should be + * cheap so memoize expensive to compute weights. If weight throws that can also result in leaked permits so it's + * always a good idea to memoize weight so it doesn't throw. + * + * In the interests of not writing unit tests for methods no one uses there is a lot of UnsupportedOperationException. + * If you need them then add them and add proper unit tests to WeightedQueueTest. "Good" tests. 100% coverage including + * exception paths and resource leaks. + **/ +public class WeightedQueue<T> implements BlockingQueue<T> +{ + private static final Logger logger = LoggerFactory.getLogger(WeightedQueue.class); + public static final Weigher NATURAL_WEIGHER = (Weigher<Object>) weighable -> + { + if (weighable instanceof Weighable) + { + return ((Weighable)weighable).weight(); + } + return 1; + }; + + private final Weigher<T> weigher; + private final BlockingQueue<T> queue; + private final int maxWeight; + final Semaphore availableWeight; + + public boolean add(T e) + { + throw new UnsupportedOperationException(); + } + + public boolean offer(T t) + { + Preconditions.checkNotNull(t); + boolean acquired = tryAcquireWeight(t); + if (acquired) + { + boolean offered = false; + try + { + offered = queue.offer(t); + return offered; + } + finally + { + if (!offered) + { + releaseWeight(t); + } + } + } + return false; + } + + public T remove() + { + throw new UnsupportedOperationException(); + } + + public T poll() + { + T retval = queue.poll(); + releaseWeight(retval); + return retval; + } + + public T element() + { + throw new UnsupportedOperationException(); + } + + public T peek() + { + throw new UnsupportedOperationException(); + } + + public void put(T t) throws InterruptedException + { + Preconditions.checkNotNull(t); + acquireWeight(t, 0, null); + boolean put = false; + try + { + queue.put(t); + put = true; + } + finally + { + if (!put) + { + releaseWeight(t); + } + } + } + + public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException + { + Preconditions.checkNotNull(t); + Preconditions.checkNotNull(unit); + boolean acquired = acquireWeight(t, timeout, unit); + if (acquired) + { + boolean offered = false; + try + { + offered = queue.offer(t, timeout, unit); + return offered; + } + finally + { + if (!offered) + { + releaseWeight(t); + } + } + } + return false; + } + + public T take() throws InterruptedException + { + T retval = queue.take(); + releaseWeight(retval); + return retval; + } + + public T poll(long timeout, TimeUnit unit) throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + public int remainingCapacity() + { + throw new UnsupportedOperationException("Seems like a bad idea"); + } + + public boolean remove(Object o) + { + throw new UnsupportedOperationException(); + } + + public boolean containsAll(Collection<?> c) + { + throw new UnsupportedOperationException("Seems like a bad idea"); + } + + public boolean addAll(Collection<? extends T> c) + { + throw new UnsupportedOperationException(); + } + + public boolean removeAll(Collection<?> c) + { + throw new UnsupportedOperationException("Seems like a bad idea"); + } + + public boolean retainAll(Collection<?> c) + { + throw new UnsupportedOperationException("Seems like a bad idea"); + } + + public void clear() + { + throw new UnsupportedOperationException(); + } + + public int size() + { + throw new UnsupportedOperationException(); + } + + public boolean isEmpty() + { + throw new UnsupportedOperationException(); + } + + public boolean contains(Object o) + { + throw new UnsupportedOperationException("Seems like a bad idea"); + } + + public Iterator<T> iterator() + { + throw new UnsupportedOperationException(); + } + + public Object[] toArray() + { + throw new UnsupportedOperationException(); + } + + public <T1> T1[] toArray(T1[] a) + { + throw new UnsupportedOperationException(); + } + + public int drainTo(Collection<? super T> c) + { + throw new UnsupportedOperationException(); + } + + public int drainTo(Collection<? super T> c, int maxElements) + { + int count = 0; + T o; + while(count < maxElements && (o = poll()) != null) + { + c.add(o); + count++; + } + return count; + } + + public interface Weigher<T> + { + int weigh(T weighable); + } + + public interface Weighable + { + int weight(); + } + + public WeightedQueue(int maxWeight) + { + this(maxWeight, new LinkedBlockingQueue<T>(), NATURAL_WEIGHER); + } + + public WeightedQueue(int maxWeight, BlockingQueue<T> queue, Weigher<T> weigher) + { + Preconditions.checkNotNull(queue); + Preconditions.checkNotNull(weigher); + Preconditions.checkArgument(maxWeight > 0); + this.maxWeight = maxWeight; + this.queue = queue; + this.weigher = weigher; + availableWeight = new Semaphore(maxWeight); + } + + boolean acquireWeight(T weighable, long timeout, TimeUnit unit) throws InterruptedException + { + int weight = weigher.weigh(weighable); + if (weight < 1) + { + throw new IllegalArgumentException(String.format("Weighable: \"%s\" had illegal weight %d", Objects.toString(weighable), weight)); + } + + //Allow exactly one overweight element + weight = Math.min(maxWeight, weight); + + if (unit != null) + { + return availableWeight.tryAcquire(weight, timeout, unit); + } + else + { + availableWeight.acquire(weight); + return true; + } + } + + boolean tryAcquireWeight(T weighable) + { + int weight = weigher.weigh(weighable); + if (weight < 1) + { + throw new IllegalArgumentException(String.format("Weighable: \"%s\" had illegal weight %d", Objects.toString(weighable), weight)); + } + + //Allow exactly one overweight element + weight = Math.min(maxWeight, weight); + + return availableWeight.tryAcquire(weight); + } + + void releaseWeight(T weighable) + { + if (weighable == null) + { + return; + } + + int weight = weigher.weigh(weighable); + if (weight < 1) + { + throw new IllegalArgumentException(String.format("Weighable: \"%s\" had illegal weight %d", Objects.toString(weighable), weight)); + } + + //Allow exactly one overweight element + weight = Math.min(maxWeight, weight); + + availableWeight.release(weight); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 93115c0..7e62c41 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -551,13 +551,15 @@ public class Util public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds) { long start = System.currentTimeMillis(); + Object lastValue = null; while (System.currentTimeMillis() < start + (1000 * timeoutInSeconds)) { - if (s.get().equals(expected)) + lastValue = s.get(); + if (lastValue.equals(expected)) break; Thread.yield(); } - assertEquals(expected, s.get()); + assertEquals(expected, lastValue); } public static void joinThread(Thread thread) throws InterruptedException http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/test/unit/org/apache/cassandra/db/fullquerylog/FullQueryLoggerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/fullquerylog/FullQueryLoggerTest.java b/test/unit/org/apache/cassandra/db/fullquerylog/FullQueryLoggerTest.java new file mode 100644 index 0000000..175cf8c --- /dev/null +++ b/test/unit/org/apache/cassandra/db/fullquerylog/FullQueryLoggerTest.java @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.fullquerylog; + + +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.Unpooled; +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ChronicleQueueBuilder; +import net.openhft.chronicle.queue.ExcerptTailer; +import net.openhft.chronicle.queue.RollCycles; +import net.openhft.chronicle.wire.ValueIn; +import net.openhft.chronicle.wire.WireOut; +import org.apache.cassandra.Util; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.db.fullquerylog.FullQueryLogger.WeighableMarshallableQuery; +import org.apache.cassandra.db.fullquerylog.FullQueryLogger.WeighableMarshallableBatch; +import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.transport.Server; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.binlog.BinLogTest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FullQueryLoggerTest +{ + private static Path tempDir; + + @BeforeClass + public static void beforeClass() throws Exception + { + tempDir = BinLogTest.tempDir(); + } + + @After + public void tearDown() + { + FullQueryLogger.instance.reset(tempDir.toString()); + } + + @Test(expected = NullPointerException.class) + public void testConfigureNullPath() throws Exception + { + FullQueryLogger.instance.configure(null, "", true, 1, 1); + } + + @Test(expected = NullPointerException.class) + public void testConfigureNullRollCycle() throws Exception + { + FullQueryLogger.instance.configure(BinLogTest.tempDir(), null, true, 1, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testConfigureInvalidRollCycle() throws Exception + { + FullQueryLogger.instance.configure(BinLogTest.tempDir(), "foobar", true, 1, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testConfigureInvalidMaxQueueWeight() throws Exception + { + FullQueryLogger.instance.configure(BinLogTest.tempDir(), "DAILY", true, 0, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testConfigureInvalidMaxQueueLogSize() throws Exception + { + FullQueryLogger.instance.configure(BinLogTest.tempDir(), "DAILY", true, 1, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testConfigureOverExistingFile() throws Exception + { + File f = File.createTempFile("foo", "bar"); + f.deleteOnExit(); + FullQueryLogger.instance.configure(f.toPath(), "TEST_SECONDLY", true, 1, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testCanRead() throws Exception + { + tempDir.toFile().setReadable(false); + try + { + configureFQL(); + } + finally + { + tempDir.toFile().setReadable(true); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testCanWrite() throws Exception + { + tempDir.toFile().setWritable(false); + try + { + configureFQL(); + } + finally + { + tempDir.toFile().setWritable(true); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testCanExecute() throws Exception + { + tempDir.toFile().setExecutable(false); + try + { + configureFQL(); + } + finally + { + tempDir.toFile().setExecutable(true); + } + } + + @Test + public void testResetWithoutConfigure() throws Exception + { + FullQueryLogger.instance.reset(tempDir.toString()); + FullQueryLogger.instance.reset(tempDir.toString()); + } + + @Test + public void stopWithoutConfigure() throws Exception + { + FullQueryLogger.instance.stop(); + FullQueryLogger.instance.stop(); + } + + /** + * Both the last used and supplied directory should get cleaned + */ + @Test + public void testResetCleansPaths() throws Exception + { + configureFQL(); + File tempA = File.createTempFile("foo", "bar", tempDir.toFile()); + assertTrue(tempA.exists()); + File tempB = File.createTempFile("foo", "bar", BinLogTest.tempDir().toFile()); + FullQueryLogger.instance.reset(tempB.getParent()); + assertFalse(tempA.exists()); + assertFalse(tempB.exists()); + } + + /** + * The last used and configured directory are the same and it shouldn't be an issue + */ + @Test + public void testResetSamePath() throws Exception + { + configureFQL(); + File tempA = File.createTempFile("foo", "bar", tempDir.toFile()); + assertTrue(tempA.exists()); + FullQueryLogger.instance.reset(tempA.getParent()); + assertFalse(tempA.exists()); + } + + @Test(expected = IllegalStateException.class) + public void testDoubleConfigure() throws Exception + { + configureFQL(); + configureFQL(); + } + + @Test + public void testCleansDirectory() throws Exception + { + assertTrue(new File(tempDir.toFile(), "foobar").createNewFile()); + configureFQL(); + assertEquals(tempDir.toFile().listFiles().length, 1); + assertEquals("directory-listing.cq4t", tempDir.toFile().listFiles()[0].getName()); + } + + @Test + public void testEnabledReset() throws Exception + { + assertFalse(FullQueryLogger.instance.enabled()); + configureFQL(); + assertTrue(FullQueryLogger.instance.enabled()); + FullQueryLogger.instance.reset(tempDir.toString()); + assertFalse(FullQueryLogger.instance.enabled()); + } + + @Test + public void testEnabledStop() throws Exception + { + assertFalse(FullQueryLogger.instance.enabled()); + configureFQL(); + assertTrue(FullQueryLogger.instance.enabled()); + FullQueryLogger.instance.stop(); + assertFalse(FullQueryLogger.instance.enabled()); + } + + /** + * Test that a thread will block if the FQL is over weight, and unblock once the backup is cleared + */ + @Test + public void testBlocking() throws Exception + { + configureFQL(); + //Prevent the bin log thread from making progress, causing the task queue to block + Semaphore blockBinLog = new Semaphore(0); + try + { + //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior + Semaphore binLogBlocked = new Semaphore(0); + FullQueryLogger.instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1) + { + + public void writeMarshallable(WireOut wire) + { + //Notify that the bin log is blocking now + binLogBlocked.release(); + try + { + //Block the bin log thread so the task queue can be filled + blockBinLog.acquire(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + super.writeMarshallable(wire); + } + + public void release() + { + super.release(); + } + }); + + //Wait for the bin log thread to block so it can't batch drain tasks + Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60); + + //Now fill the task queue + logQuery("foo2"); + + //Start a thread to block waiting on the bin log queue + Thread t = new Thread(() -> + { + logQuery("foo3"); + //Should be able to log another query without an issue + logQuery("foo4"); + }); + t.start(); + Thread.sleep(500); + //If thread state is terminated then the thread started, finished, and didn't block on the full task queue + assertTrue(t.getState() != Thread.State.TERMINATED); + } + finally + { + //Unblock the binlog thread + blockBinLog.release(); + } + Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo3", "foo4")), 60); + } + + private boolean checkForQueries(List<String> queries) + { + try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build()) + { + ExcerptTailer tailer = queue.createTailer(); + List<String> expectedQueries = new LinkedList<>(queries); + while (!expectedQueries.isEmpty()) + { + if (!tailer.readDocument(wire -> { + assertEquals(expectedQueries.get(0), wire.read("query").text()); + expectedQueries.remove(0); + })) + { + return false; + } + } + assertFalse(tailer.readDocument(wire -> {})); + return true; + } + } + + @Test + public void testNonBlocking() throws Exception + { + FullQueryLogger.instance.configure(tempDir, "TEST_SECONDLY", false, 1, 1024 * 1024 * 256); + //Prevent the bin log thread from making progress, causing the task queue to refuse tasks + Semaphore blockBinLog = new Semaphore(0); + try + { + //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior + Semaphore binLogBlocked = new Semaphore(0); + FullQueryLogger.instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1) + { + + public void writeMarshallable(WireOut wire) + { + //Notify that the bin log is blocking now + binLogBlocked.release(); + try + { + //Block the bin log thread so the task queue can be filled + blockBinLog.acquire(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + super.writeMarshallable(wire); + } + + public void release() + { + super.release(); + } + }); + + //Wait for the bin log thread to block so it can't batch drain tasks + Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60); + + //Now fill the task queue + logQuery("foo2"); + + //This sample should get dropped AKA released without being written + AtomicInteger releasedCount = new AtomicInteger(0); + AtomicInteger writtenCount = new AtomicInteger(0); + FullQueryLogger.instance.logRecord(new WeighableMarshallableQuery("foo3", QueryOptions.DEFAULT, 1) { + public void writeMarshallable(WireOut wire) + { + writtenCount.incrementAndGet(); + super.writeMarshallable(wire); + } + + public void release() + { + releasedCount.incrementAndGet(); + super.release(); + } + }, FullQueryLogger.instance.binLog); + + Util.spinAssertEquals(1, releasedCount::get, 60); + assertEquals(0, writtenCount.get()); + } + finally + { + blockBinLog.release(); + } + //Wait for tasks to drain so there should be space in the queue + Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2")), 60); + //Should be able to log again + logQuery("foo4"); + Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo1", "foo2", "foo4")), 60); + } + + @Test + public void testRoundTripQuery() throws Exception + { + configureFQL(); + logQuery("foo"); + Util.spinAssertEquals(true, () -> checkForQueries(Arrays.asList("foo")), 60); + try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build()) + { + ExcerptTailer tailer = queue.createTailer(); + assertTrue(tailer.readDocument(wire -> { + assertEquals("single", wire.read("type").text()); + ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32()); + assertEquals(ProtocolVersion.CURRENT, protocolVersion); + QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion); + compareQueryOptions(QueryOptions.DEFAULT, queryOptions); + assertEquals(1L, wire.read("query-time").int64()); + assertEquals("foo", wire.read("query").text()); + })); + } + } + + @Test + public void testRoundTripBatch() throws Exception + { + configureFQL(); + FullQueryLogger.instance.logBatch("UNLOGGED", Arrays.asList("foo1", "foo2"), Arrays.asList(Arrays.asList(ByteBuffer.allocate(1) , ByteBuffer.allocateDirect(2)), Arrays.asList()), QueryOptions.DEFAULT, 1); + Util.spinAssertEquals(true, () -> + { + try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build()) + { + return queue.createTailer().readingDocument().isPresent(); + } + }, 60); + try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build()) + { + ExcerptTailer tailer = queue.createTailer(); + assertTrue(tailer.readDocument(wire -> { + assertEquals("batch", wire.read("type").text()); + ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32()); + assertEquals(ProtocolVersion.CURRENT, protocolVersion); + QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion); + assertEquals(1L, wire.read("query-time").int64()); + compareQueryOptions(QueryOptions.DEFAULT, queryOptions); + assertEquals("UNLOGGED", wire.read("batch-type").text()); + ValueIn in = wire.read("queries"); + assertEquals(2, in.int32()); + assertEquals("foo1", in.text()); + assertEquals("foo2", in.text()); + in = wire.read("values"); + assertEquals(2, in.int32()); + assertEquals(2, in.int32()); + assertTrue(Arrays.equals(new byte[1], in.bytes())); + assertTrue(Arrays.equals(new byte[2], in.bytes())); + assertEquals(0, in.int32()); + })); + } + } + + @Test + public void testQueryWeight() + { + //Empty query should have some weight + WeighableMarshallableQuery query = new WeighableMarshallableQuery("", QueryOptions.DEFAULT, 1); + assertTrue(query.weight() >= 95); + + StringBuilder sb = new StringBuilder(); + for (int ii = 0; ii < 1024 * 1024; ii++) + { + sb.append('a'); + } + query = new WeighableMarshallableQuery(sb.toString(), QueryOptions.DEFAULT, 1); + + //A large query should be reflected in the size, * 2 since characters are still two bytes + assertTrue(query.weight() > ObjectSizes.measureDeep(sb.toString())); + + //Large query options should be reflected + QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024))); + query = new WeighableMarshallableQuery("", largeOptions, 1); + assertTrue(query.weight() > 1024 * 1024); + System.out.printf("weight %d%n", query.weight()); + } + + @Test + public void testBatchWeight() + { + //An empty batch should have weight + WeighableMarshallableBatch batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1); + assertTrue(batch.weight() >= 183); + + StringBuilder sb = new StringBuilder(); + for (int ii = 0; ii < 1024 * 1024; ii++) + { + sb.append('a'); + } + + //The weight of the type string should be reflected + batch = new WeighableMarshallableBatch(sb.toString(), new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1); + assertTrue(batch.weight() > ObjectSizes.measureDeep(sb.toString())); + + //The weight of the list containing queries should be reflected + List<String> bigList = new ArrayList(100000); + for (int ii = 0; ii < 100000; ii++) + { + bigList.add(""); + } + batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1); + assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList)); + + //The size of the query should be reflected + bigList = new ArrayList(1); + bigList.add(sb.toString()); + batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1); + assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList)); + + bigList = null; + //The size of the list of values should be reflected + List<List<ByteBuffer>> bigValues = new ArrayList<>(100000); + for (int ii = 0; ii < 100000; ii++) + { + bigValues.add(new ArrayList<>(0)); + } + bigValues.get(0).add(ByteBuffer.allocate(1024 * 1024 * 5)); + batch = new WeighableMarshallableBatch("", new ArrayList<>(), bigValues, QueryOptions.DEFAULT, 1); + assertTrue(batch.weight() > ObjectSizes.measureDeep(bigValues)); + + //As should the size of the values + QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024))); + batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), largeOptions, 1); + assertTrue(batch.weight() > 1024 * 1024); + } + + @Test(expected = NullPointerException.class) + public void testLogBatchNullType() throws Exception + { + FullQueryLogger.instance.logBatch(null, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1); + } + + @Test(expected = NullPointerException.class) + public void testLogBatchNullQueries() throws Exception + { + FullQueryLogger.instance.logBatch("", null, new ArrayList<>(), QueryOptions.DEFAULT, 1); + } + + @Test(expected = NullPointerException.class) + public void testLogBatchNullQueriesQuery() throws Exception + { + configureFQL(); + FullQueryLogger.instance.logBatch("", Arrays.asList((String)null), new ArrayList<>(), QueryOptions.DEFAULT, 1); + } + + @Test(expected = NullPointerException.class) + public void testLogBatchNullValues() throws Exception + { + FullQueryLogger.instance.logBatch("", new ArrayList<>(), null, QueryOptions.DEFAULT, 1); + } + + @Test(expected = NullPointerException.class) + public void testLogBatchNullValuesValue() throws Exception + { + FullQueryLogger.instance.logBatch("", new ArrayList<>(), Arrays.asList((List<ByteBuffer>)null), null, 1); + } + + @Test(expected = NullPointerException.class) + public void testLogBatchNullQueryOptions() throws Exception + { + FullQueryLogger.instance.logBatch("", new ArrayList<>(), new ArrayList<>(), null, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testLogBatchNegativeTime() throws Exception + { + FullQueryLogger.instance.logBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, -1); + } + + @Test(expected = NullPointerException.class) + public void testLogQueryNullQuery() throws Exception + { + FullQueryLogger.instance.logQuery(null, QueryOptions.DEFAULT, 1); + } + + @Test(expected = NullPointerException.class) + public void testLogQueryNullQueryOptions() throws Exception + { + FullQueryLogger.instance.logQuery("", null, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testLogQueryNegativeTime() throws Exception + { + FullQueryLogger.instance.logQuery("", QueryOptions.DEFAULT, -1); + } + + private static void compareQueryOptions(QueryOptions a, QueryOptions b) + { + assertEquals(a.getClass(), b.getClass()); + assertEquals(a.getProtocolVersion(), b.getProtocolVersion()); + assertEquals(a.getPageSize(), b.getPageSize()); + assertEquals(a.getConsistency(), b.getConsistency()); + assertEquals(a.getPagingState(), b.getPagingState()); + assertEquals(a.getValues(), b.getValues()); + assertEquals(a.getSerialConsistency(), b.getSerialConsistency()); + } + + private void configureFQL() throws Exception + { + FullQueryLogger.instance.configure(tempDir, "TEST_SECONDLY", true, 1, 1024 * 1024 * 256); + } + + private void logQuery(String query) + { + FullQueryLogger.instance.logQuery(query, QueryOptions.DEFAULT, 1); + } + +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java new file mode 100644 index 0000000..204e27b --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.binlog; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ChronicleQueueBuilder; +import net.openhft.chronicle.queue.ExcerptTailer; +import net.openhft.chronicle.queue.RollCycles; +import net.openhft.chronicle.wire.WireOut; +import org.apache.cassandra.Util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class BinLogTest +{ + public static Path tempDir() throws Exception + { + File f = File.createTempFile("foo", "bar"); + f.delete(); + f.mkdir(); + return Paths.get(f.getPath()); + } + + private static final String testString = "ry@nlikestheyankees"; + private static final String testString2 = testString + "1"; + + private BinLog binLog; + private Path path; + + @Before + public void setUp() throws Exception + { + path = tempDir(); + binLog = new BinLog(path, RollCycles.TEST_SECONDLY, 10, 1024 * 1024 * 128); + binLog.start(); + } + + @After + public void tearDown() throws Exception + { + if (binLog != null) + { + binLog.stop(); + } + for (File f : path.toFile().listFiles()) + { + f.delete(); + } + } + + @Test(expected = NullPointerException.class) + public void testConstructorNullPath() throws Exception + { + new BinLog(null, RollCycles.TEST_SECONDLY, 1, 1); + } + + @Test(expected = NullPointerException.class) + public void testConstructorNullRollCycle() throws Exception + { + new BinLog(tempDir(), null, 1, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorZeroWeight() throws Exception + { + new BinLog(tempDir(), RollCycles.TEST_SECONDLY, 0, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorLogSize() throws Exception + { + new BinLog(tempDir(), RollCycles.TEST_SECONDLY, 1, 0); + } + + /** + * Check that we can start and stop the bin log and that it releases resources held by any subsequent appended + * records + */ + @Test + public void testBinLogStartStop() throws Exception + { + Semaphore blockBinLog = new Semaphore(1); + AtomicInteger releaseCount = new AtomicInteger(); + binLog.put(new BinLog.ReleaseableWriteMarshallable() + { + protected void release() + { + releaseCount.incrementAndGet(); + } + + public void writeMarshallable(WireOut wire) + { + try + { + blockBinLog.acquire(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + }); + binLog.put(new BinLog.ReleaseableWriteMarshallable() + { + + public void writeMarshallable(WireOut wire) + { + + } + + protected void release() + { + releaseCount.incrementAndGet(); + } + }); + Thread.sleep(1000); + assertEquals(2, releaseCount.get()); + Thread t = new Thread(() -> { + try + { + binLog.stop(); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + }); + t.start(); + t.join(60 * 1000); + assertEquals(t.getState(), Thread.State.TERMINATED); + + Util.spinAssertEquals(2, releaseCount::get, 60); + Util.spinAssertEquals(Thread.State.TERMINATED, binLog.binLogThread::getState, 60); + } + + /** + * Check that the finalizer releases any stragglers in the queue + */ + @Test + public void testBinLogFinalizer() throws Exception + { + binLog.stop(); + Semaphore released = new Semaphore(0); + binLog.sampleQueue.put(new BinLog.ReleaseableWriteMarshallable() + { + protected void release() + { + released.release(); + } + + public void writeMarshallable(WireOut wire) + { + + } + }); + binLog = null; + + for (int ii = 0; ii < 30; ii++) + { + System.gc(); + System.runFinalization(); + Thread.sleep(100); + if (released.tryAcquire()) + return; + } + fail("Finalizer never released resources"); + } + + /** + * Test that put blocks and unblocks and creates records + */ + @Test + public void testPut() throws Exception + { + binLog.put(record(testString)); + binLog.put(record(testString2)); + + Util.spinAssertEquals(2, () -> readBinLogRecords(path).size(), 60); + List<String> records = readBinLogRecords(path); + assertEquals(testString, records.get(0)); + assertEquals(testString2, records.get(1)); + + + //Prevent the bin log thread from making progress + Semaphore blockBinLog = new Semaphore(0); + //Get notified when the bin log thread has blocked and definitely won't batch drain tasks + Semaphore binLogBlocked = new Semaphore(0); + try + { + binLog.put(new BinLog.ReleaseableWriteMarshallable() + { + protected void release() + { + } + + public void writeMarshallable(WireOut wire) + { + //Notify the bing log thread is about to block + binLogBlocked.release(); + try + { + //Block the bin log thread so it doesn't process more tasks + blockBinLog.acquire(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + }); + + //Wait for the bin log thread to block so it doesn't batch drain + Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60); + + //Now fill the queue up to capacity and it shouldn't block + for (int ii = 0; ii < 10; ii++) + { + binLog.put(record(testString)); + } + + //Thread to block on the full queue + Thread t = new Thread(() -> + { + try + { + binLog.put(record(testString)); + //Should be able to do it again after unblocking + binLog.put(record(testString)); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + }); + t.start(); + Thread.sleep(500); + //If the thread is not terminated then it is probably blocked on the queue + assertTrue(t.getState() != Thread.State.TERMINATED); + } + finally + { + blockBinLog.release(); + } + + //Expect all the records to eventually be there including one from the blocked thread + Util.spinAssertEquals(15, () -> readBinLogRecords(path).size(), 60); + } + + @Test + public void testOffer() throws Exception + { + assertTrue(binLog.offer(record(testString))); + assertTrue(binLog.offer(record(testString2))); + + Util.spinAssertEquals(2, () -> readBinLogRecords(path).size(), 60); + List<String> records = readBinLogRecords(path); + assertEquals(testString, records.get(0)); + assertEquals(testString2, records.get(1)); + + //Prevent the bin log thread from making progress + Semaphore blockBinLog = new Semaphore(0); + //Get notified when the bin log thread has blocked and definitely won't batch drain tasks + Semaphore binLogBlocked = new Semaphore(0); + try + { + assertTrue(binLog.offer(new BinLog.ReleaseableWriteMarshallable() + { + protected void release() + { + } + + public void writeMarshallable(WireOut wire) + { + //Notify the bing log thread is about to block + binLogBlocked.release(); + try + { + //Block the bin log thread so it doesn't process more tasks + blockBinLog.acquire(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + })); + + //Wait for the bin log thread to block so it doesn't batch drain + Util.spinAssertEquals(true, binLogBlocked::tryAcquire, 60); + + //Now fill the queue up to capacity and it should always accept + for (int ii = 0; ii < 10; ii++) + { + assertTrue(binLog.offer(record(testString))); + } + + //it shoudl reject this record since it is full + assertFalse(binLog.offer(record(testString))); + } + finally + { + blockBinLog.release(); + } + Util.spinAssertEquals(13, () -> readBinLogRecords(path).size(), 60); + assertTrue(binLog.offer(record(testString))); + Util.spinAssertEquals(14, () -> readBinLogRecords(path).size(), 60); + } + + /** + * Set a very small segment size so on rolling the segments are always deleted + */ + @Test + public void testCleanupOnOversize() throws Exception + { + tearDown(); + binLog = new BinLog(path, RollCycles.TEST_SECONDLY, 10000, 1); + binLog.start(); + for (int ii = 0; ii < 5; ii++) + { + binLog.put(record(String.valueOf(ii))); + Thread.sleep(1001); + } + List<String> records = readBinLogRecords(path); + System.out.println("Records found are " + records); + assertTrue(records.size() < 5); + } + + @Test(expected = IllegalStateException.class) + public void testNoReuse() throws Exception + { + binLog.stop(); + binLog.start(); + } + + @Test + public void testOfferAfterStop() throws Exception + { + binLog.stop(); + assertFalse(binLog.offer(record(testString))); + } + + @Test + public void testPutAfterStop() throws Exception + { + binLog.stop(); + binLog.put(record(testString)); + assertEquals(null, binLog.sampleQueue.poll()); + } + + /** + * Test for a bug where files were deleted but the space was not reclaimed when tracking so + * all log segemnts were incorrectly deleted when rolled. + */ + @Test + public void testTrucationReleasesLogSpace() throws Exception + { + StringBuilder sb = new StringBuilder(); + for (int ii = 0; ii < 1024 * 1024 * 2; ii++) + { + sb.append('a'); + } + + String queryString = sb.toString(); + + //This should fill up the log so when it rolls in the future it will always delete the rolled segment; + for (int ii = 0; ii < 129; ii++) + { + binLog.put(record(queryString)); + } + + for (int ii = 0; ii < 2; ii++) + { + Thread.sleep(2000); + binLog.put(record(queryString)); + } + + Util.spinAssertEquals(2, () -> readBinLogRecords(path).size(), 60); + } + + static BinLog.ReleaseableWriteMarshallable record(String text) + { + return new BinLog.ReleaseableWriteMarshallable() + { + protected void release() + { + //Do nothing + } + + public void writeMarshallable(WireOut wire) + { + wire.write("text").text(text); + } + }; + } + + List<String> readBinLogRecords(Path path) + { + List<String> records = new ArrayList<String>(); + try (ChronicleQueue queue = ChronicleQueueBuilder.single(path.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build()) + { + ExcerptTailer tailer = queue.createTailer(); + while (true) + { + if (!tailer.readDocument(wire -> + { + records.add(wire.read("text").text()); + })) + { + return records; + } + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae837806/test/unit/org/apache/cassandra/utils/concurrent/WeightedQueueTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/concurrent/WeightedQueueTest.java b/test/unit/org/apache/cassandra/utils/concurrent/WeightedQueueTest.java new file mode 100644 index 0000000..544e95c --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/concurrent/WeightedQueueTest.java @@ -0,0 +1,656 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.concurrent; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class WeightedQueueTest +{ + private static WeightedQueue<Object> queue() + { + return new WeightedQueue<>(10); + } + + private WeightedQueue<Object> queue; + + @Before + public void setUp() + { + queue = queue(); + } + + private static WeightedQueue.Weighable weighable(int weight) + { + return () -> weight; + } + + @Test(expected = UnsupportedOperationException.class) + public void testAddUnsupported() throws Exception + { + queue.add(new Object()); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRemoveUnsupported() throws Exception + { + queue.remove(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testElementUnsupported() throws Exception + { + queue.element(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testPeekUnsupported() throws Exception + { + queue.peek(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRemainingCapacityUnsupported() throws Exception + { + queue.remainingCapacity(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRemoveElementUnsupported() throws Exception + { + queue.remove(null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testContainsAllUnsupported() throws Exception + { + queue.containsAll(null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testAddAllUnsupported() throws Exception + { + queue.addAll(null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRemoveAllUnsupported() throws Exception + { + queue.removeAll(null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRetainAllUnsupported() throws Exception + { + queue.retainAll(null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClearUnsupported() throws Exception + { + queue.clear(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testSizeUnsupported() throws Exception + { + queue.size(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testIsEmptyUnsupported() throws Exception + { + queue.isEmpty(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testContainsUnsupported() throws Exception + { + queue.contains(null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testIteratorUnsupported() throws Exception + { + queue.iterator(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testToArrayUnsupported() throws Exception + { + queue.toArray(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testToArray2Unsupported() throws Exception + { + queue.toArray( new Object[] {}); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDrainToUnsupported() throws Exception + { + queue.drainTo(new ArrayList<>()); + } + + @Test(expected = UnsupportedOperationException.class) + public void testTimedPollUnsupported() throws Exception + { + queue.poll(1, TimeUnit.MICROSECONDS); + } + + @Test + public void testDrainToWithLimit() throws Exception + { + queue.offer(new Object()); + queue.offer(new Object()); + queue.offer(new Object()); + ArrayList<Object> list = new ArrayList<>(); + queue.drainTo(list, 1); + assertEquals(1, list.size()); + list.clear(); + queue.drainTo(list, 10); + assertEquals(2, list.size()); + } + + @Test(expected = NullPointerException.class) + public void offerNullThrows() throws Exception + { + queue.offer(null); + } + + /** + * This also tests that natural weight (weighable interface) is respected + */ + @Test + public void offerFullFails() throws Exception + { + assertTrue(queue.offer(weighable(10))); + assertFalse(queue.offer(weighable(1))); + } + + /** + * Validate permits aren't leaked and return values are correct + */ + @Test + public void testOfferWrappedQueueRefuses() throws Exception + { + queue = new WeightedQueue<>(10, new BadQueue(true), WeightedQueue.NATURAL_WEIGHER); + assertEquals(10, queue.availableWeight.availablePermits()); + assertFalse(queue.offer(new Object())); + assertEquals(10, queue.availableWeight.availablePermits()); + } + + /** + * Validate permits aren't leaked and return values are correct + */ + @Test + public void testOfferWrappedQueueThrows() throws Exception + { + queue = new WeightedQueue<>(10, new BadQueue(false), WeightedQueue.NATURAL_WEIGHER); + assertEquals(10, queue.availableWeight.availablePermits()); + try + { + assertFalse(queue.offer(new Object())); + fail(); + } + catch (UnsupportedOperationException e) + { + //expected and desired + } + assertEquals(10, queue.availableWeight.availablePermits()); + } + + /** + * If not weighable and not custom weigher the default weight is 1 + */ + @Test + public void defaultWeightRespected() throws Exception + { + for (int ii = 0; ii < 10; ii++) + { + assertTrue(queue.offer(new Object())); + } + assertFalse(queue.offer(new Object())); + } + + @Test + public void testCustomWeigher() throws Exception + { + queue = new WeightedQueue<>(10, new LinkedBlockingQueue<>(), weighable -> 10 ); + assertTrue(queue.offer(new Object())); + assertFalse(queue.offer(new Object())); + } + + @Test(expected = UnsupportedOperationException.class) + public void testCustomQueue() throws Exception + { + new WeightedQueue<>(10, new BadQueue(false), WeightedQueue.NATURAL_WEIGHER).offer(new Object()); + } + + @Test(expected = NullPointerException.class) + public void timedOfferNullValueThrows() throws Exception + { + queue.offer(null, 1, TimeUnit.SECONDS); + } + + @Test(expected = NullPointerException.class) + public void timedOfferNullTimeThrows() throws Exception + { + queue.offer(null, 1, null); + } + + /** + * This is how it seems to be handled in java.util.concurrent, it's the same as just try + */ + @Test + public void timedOfferNegativeTimeIgnored() throws Exception + { + queue.offer(weighable(10)); + queue.offer(new Object(), -1, TimeUnit.SECONDS); + } + + /** + * This also tests that natural weight (weighable interface) is respected + */ + @Test + public void timedOfferFullFails() throws Exception + { + assertTrue(queue.offer(weighable(10), 1, TimeUnit.MICROSECONDS)); + assertFalse(queue.offer(weighable(1), 1, TimeUnit.MICROSECONDS)); + } + + @Test + public void timedOfferEventuallySucceeds() throws Exception + { + assertTrue(queue.offer(weighable(10), 1, TimeUnit.MICROSECONDS)); + Thread t = new Thread(() -> + { + try + { + queue.offer(weighable(1), 1, TimeUnit.DAYS); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + }); + t.start(); + Thread.sleep(100); + assertTrue(t.getState() != Thread.State.TERMINATED); + queue.poll(); + t.join(60000); + assertEquals(t.getState(), Thread.State.TERMINATED); + } + + /** + * Validate permits aren't leaked and return values are correct + */ + @Test + public void testTimedOfferWrappedQueueRefuses() throws Exception + { + queue = new WeightedQueue<>(10, new BadQueue(true), WeightedQueue.NATURAL_WEIGHER); + assertEquals(10, queue.availableWeight.availablePermits()); + assertFalse(queue.offer(new Object(), 1, TimeUnit.MICROSECONDS)); + assertEquals(10, queue.availableWeight.availablePermits()); + } + + /** + * Validate permits aren't leaked and return values are correct + */ + @Test + public void testTimedOfferWrappedQueueThrows() throws Exception + { + queue = new WeightedQueue<>(10, new BadQueue(false), WeightedQueue.NATURAL_WEIGHER); + assertEquals(10, queue.availableWeight.availablePermits()); + try + { + assertFalse(queue.offer(new Object(), 1, TimeUnit.MICROSECONDS)); + fail(); + } + catch (UnsupportedOperationException e) + { + //expected and desired + } + assertEquals(10, queue.availableWeight.availablePermits()); + } + + + @Test + public void testPoll() throws Exception + { + assertEquals(10, queue.availableWeight.availablePermits()); + assertNull(queue.poll()); + assertEquals(10, queue.availableWeight.availablePermits()); + Object o = new Object(); + assertTrue(queue.offer(o)); + assertEquals(9, queue.availableWeight.availablePermits()); + WeightedQueue.Weighable weighable = weighable(9); + assertTrue(queue.offer(weighable)); + assertEquals(0, queue.availableWeight.availablePermits()); + assertEquals(o, queue.poll()); + assertEquals(1, queue.availableWeight.availablePermits()); + assertEquals(weighable, queue.poll()); + assertEquals(10, queue.availableWeight.availablePermits()); + } + + @Test(expected = NullPointerException.class) + public void testPutNullThrows() throws Exception + { + queue.put(null); + } + + @Test + public void testPutFullBlocks() throws Exception + { + WeightedQueue.Weighable weighable = weighable(10); + assertEquals(10, queue.availableWeight.availablePermits()); + queue.put(weighable); + assertEquals(0, queue.availableWeight.availablePermits()); + Object o = new Object(); + Thread t = new Thread(() -> { + try + { + queue.put(o); + } catch (InterruptedException e) + { + e.printStackTrace(); + } + }); + t.start(); + Thread.sleep(100); + assertTrue(t.getState() != Thread.State.TERMINATED); + assertEquals(0, queue.availableWeight.availablePermits()); + assertEquals(weighable, queue.poll()); + assertTrue(queue.availableWeight.availablePermits() > 0); + t.join(); + assertEquals(o, queue.poll()); + assertEquals(10, queue.availableWeight.availablePermits()); + } + + @Test + public void testPutWrappedQueueThrows() throws Exception + { + queue = new WeightedQueue<>(10, new BadQueue(false), WeightedQueue.NATURAL_WEIGHER); + assertEquals(10, queue.availableWeight.availablePermits()); + try + { + queue.put(new Object()); + fail(); + } + catch (UnsupportedOperationException e) + { + //expected and desired + } + assertEquals(10, queue.availableWeight.availablePermits()); + } + + @Test(expected = IllegalArgumentException.class) + public void testTryAcquireWeightIllegalWeight() + { + queue.tryAcquireWeight(weighable(-1)); + } + + @Test(expected = IllegalArgumentException.class) + public void testAcquireWeightIllegalWeight() throws Exception + { + queue.acquireWeight(weighable(-1), 1, TimeUnit.DAYS); + } + + @Test(expected = IllegalArgumentException.class) + public void testReleaseWeightIllegalWeight() + { + queue.releaseWeight(weighable(-1)); + } + + @Test + public void testTake() throws Exception + { + Thread t = new Thread(() -> { + try + { + queue.take(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + }); + t.start(); + Thread.sleep(500); + assertTrue(t.getState() != Thread.State.TERMINATED); + assertEquals(10, queue.availableWeight.availablePermits()); + queue.offer(new Object()); + t.join(60 * 1000); + assertEquals(t.getState(), Thread.State.TERMINATED); + assertEquals(10, queue.availableWeight.availablePermits()); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorLTZeroWeightThrows() throws Exception + { + new WeightedQueue(0); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructor2LTZeroWeightThrows() throws Exception + { + new WeightedQueue(0, new LinkedBlockingQueue<>(), WeightedQueue.NATURAL_WEIGHER); + } + + @Test(expected = NullPointerException.class) + public void testConstructorNullQueueThrows() throws Exception + { + new WeightedQueue(1, null, WeightedQueue.NATURAL_WEIGHER); + } + + @Test(expected = NullPointerException.class) + public void testConstructorNullWeigherThrows() throws Exception + { + new WeightedQueue(1, new LinkedBlockingQueue<>(), null); + } + + /** + * A blocking queue that throws or refuses on every method + */ + private static class BadQueue implements BlockingQueue<Object> + { + /** + * Refuse instead of throwing for some methods that have a boolean return value + */ + private boolean refuse = false; + + private BadQueue(boolean refuse) + { + this.refuse = refuse; + } + + @Override + public boolean add(Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean offer(Object o) + { + if (refuse) + { + return false; + } + throw new UnsupportedOperationException(); + } + + @Override + public Object remove() + { + throw new UnsupportedOperationException(); + } + + @Override + public Object poll() + { + throw new UnsupportedOperationException(); + } + + @Override + public Object element() + { + throw new UnsupportedOperationException(); + } + + @Override + public Object peek() + { + throw new UnsupportedOperationException(); + } + + @Override + public void put(Object o) throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean offer(Object o, long timeout, TimeUnit unit) throws InterruptedException + { + if (refuse) + { + return false; + } + throw new UnsupportedOperationException(); + } + + @Override + public Object take() throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + @Override + public Object poll(long timeout, TimeUnit unit) throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + @Override + public int remainingCapacity() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection c) + { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) + { + throw new UnsupportedOperationException(); + } + + @Override + public int size() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmpty() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean contains(Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() + { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() + { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray(Object[] a) + { + throw new UnsupportedOperationException(); + } + + @Override + public int drainTo(Collection c) + { + throw new UnsupportedOperationException(); + } + + @Override + public int drainTo(Collection c, int maxElements) + { + throw new UnsupportedOperationException(); + } + }; + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org