Restore resumable hints delivery Backport of CASSANDRA-11960
patch by Tommy Stendahl, Stefan Podkowinski and Branimir Lambov; reviewed by Aleksey Yeschenko for CASSANDRA-14419 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c4982587 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c4982587 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c4982587 Branch: refs/heads/trunk Commit: c4982587bfe3cb6946daa2912fe46146edef7fbf Parents: 1e478d3 Author: tommy stendahl <tommy.stend...@ericsson.com> Authored: Tue Jul 3 12:53:59 2018 +0200 Committer: Aleksey Yeshchenko <alek...@apple.com> Committed: Tue Jul 3 14:22:18 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/hints/ChecksummedDataInput.java | 53 +++- .../hints/CompressedChecksummedDataInput.java | 53 +++- .../cassandra/hints/HintsDispatchExecutor.java | 21 +- .../apache/cassandra/hints/HintsDispatcher.java | 39 +-- .../org/apache/cassandra/hints/HintsReader.java | 32 +-- .../apache/cassandra/hints/HintsService.java | 27 +- .../org/apache/cassandra/hints/HintsStore.java | 14 +- .../apache/cassandra/hints/InputPosition.java | 26 ++ .../cassandra/hints/HintsCatalogTest.java | 31 +++ .../cassandra/hints/HintsCompressionTest.java | 17 ++ .../cassandra/hints/HintsServiceTest.java | 261 +++++++++++++++++++ 12 files changed, 511 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d694f3b..ee95718 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.17 + * Restore resumable hints delivery, backport CASSANDRA-11960 (CASSANDRA-14419) * Always close RT markers returned by ReadCommand#executeLocally() (CASSANDRA-14515) * Reverse order queries with range tombstones can cause data loss (CASSANDRA-14513) * Fix regression of lagging commitlog flush log message (CASSANDRA-14451) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java index 39f46a4..a78256b 100644 --- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java @@ -25,17 +25,18 @@ import java.util.zip.CRC32; import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.DataPosition; import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.utils.NativeLibrary; /** - * A {@link RandomAccessReader} wrapper that calctulates the CRC in place. + * A {@link RandomAccessReader} wrapper that calculates the CRC in place. * * Useful for {@link org.apache.cassandra.hints.HintsReader}, for example, where we must verify the CRC, yet don't want * to allocate an extra byte array just that purpose. The CRC can be embedded in the input stream and checked via checkCrc(). * - * In addition to calculating the CRC, it allows to enforce a maximim known size. This is needed + * In addition to calculating the CRC, it allows to enforce a maximum known size. This is needed * so that {@link org.apache.cassandra.db.Mutation.MutationSerializer} doesn't blow up the heap when deserializing a * corrupted sequence by reading a huge corrupted length of bytes via - * via {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}. + * {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}. */ public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel { @@ -63,9 +64,37 @@ public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderW return new Builder(new ChannelProxy(file)).build(); } - protected void releaseBuffer() + static class Position implements InputPosition { - super.releaseBuffer(); + final long sourcePosition; + + public Position(long sourcePosition) + { + super(); + this.sourcePosition = sourcePosition; + } + + @Override + public long subtract(InputPosition other) + { + return sourcePosition - ((Position)other).sourcePosition; + } + } + + /** + * Return a seekable representation of the current position. For compressed files this is chunk position + * in file and offset within chunk. + */ + public InputPosition getSeekPosition() + { + return new Position(getPosition()); + } + + public void seek(InputPosition pos) + { + updateCrc(); + bufferOffset = ((Position) pos).sourcePosition; + buffer.position(0).limit(0); } public void resetCrc() @@ -80,6 +109,15 @@ public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderW limitMark = mark(); } + /** + * Returns the position in the source file, which is different for getPosition() for compressed/encrypted files + * and may be imprecise. + */ + protected long getSourcePosition() + { + return bufferOffset; + } + public void resetLimit() { limit = Long.MAX_VALUE; @@ -141,6 +179,11 @@ public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderW crcPosition = buffer.position(); } + public void tryUncacheRead() + { + NativeLibrary.trySkipCache(getChannel().getFileDescriptor(), 0, getSourcePosition(), getPath()); + } + private void updateCrc() { if (crcPosition == buffer.position() || crcUpdateDisabled) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java index cc4a6bd..c0de1cf 100644 --- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java @@ -29,7 +29,8 @@ import org.apache.cassandra.utils.memory.BufferPool; public final class CompressedChecksummedDataInput extends ChecksummedDataInput { private final ICompressor compressor; - private volatile long filePosition = 0; + private volatile long filePosition = 0; // Current position in file, advanced when reading chunk. + private volatile long sourcePosition = 0; // Current position in file to report, advanced after consuming chunk. private volatile ByteBuffer compressedBuffer = null; private final ByteBuffer metadataBuffer = ByteBuffer.allocate(CompressedHintsWriter.METADATA_SIZE); @@ -39,7 +40,7 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput assert regions == null; //mmapped regions are not supported compressor = builder.compressor; - filePosition = builder.position; + sourcePosition = filePosition = builder.position; } /** @@ -51,8 +52,56 @@ public final class CompressedChecksummedDataInput extends ChecksummedDataInput return filePosition == channel.size() && buffer.remaining() == 0; } + public long getSourcePosition() + { + return sourcePosition; + } + + static class Position extends ChecksummedDataInput.Position + { + final long bufferStart; + final int bufferPosition; + + public Position(long sourcePosition, long bufferStart, int bufferPosition) + { + super(sourcePosition); + this.bufferStart = bufferStart; + this.bufferPosition = bufferPosition; + } + + @Override + public long subtract(InputPosition o) + { + Position other = (Position) o; + return bufferStart - other.bufferStart + bufferPosition - other.bufferPosition; + } + } + + public InputPosition getSeekPosition() + { + return new Position(sourcePosition, bufferOffset, buffer.position()); + } + + public void seek(InputPosition p) + { + Position pos = (Position) p; + bufferOffset = pos.bufferStart; + filePosition = pos.sourcePosition; + buffer.position(0).limit(0); + resetCrc(); + reBuffer(); + buffer.position(pos.bufferPosition); + assert sourcePosition == pos.sourcePosition; + assert bufferOffset == pos.bufferStart; + assert buffer.position() == pos.bufferPosition; + } + protected void reBufferStandard() { + sourcePosition = filePosition; + if (isEOF()) + return; + metadataBuffer.clear(); channel.read(metadataBuffer, filePosition); filePosition += CompressedHintsWriter.METADATA_SIZE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java index c9e69f5..d6c28d4 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; +import java.util.function.Function; import java.util.function.Supplier; import com.google.common.util.concurrent.RateLimiter; @@ -48,12 +50,14 @@ final class HintsDispatchExecutor private final File hintsDirectory; private final ExecutorService executor; private final AtomicBoolean isPaused; + private final Function<InetAddress, Boolean> isAlive; private final Map<UUID, Future> scheduledDispatches; - HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused) + HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused, Function<InetAddress, Boolean> isAlive) { this.hintsDirectory = hintsDirectory; this.isPaused = isPaused; + this.isAlive = isAlive; scheduledDispatches = new ConcurrentHashMap<>(); executor = new JMXEnabledThreadPoolExecutor(maxThreads, 1, TimeUnit.MINUTES, @@ -69,6 +73,14 @@ final class HintsDispatchExecutor { scheduledDispatches.clear(); executor.shutdownNow(); + try + { + executor.awaitTermination(1, TimeUnit.MINUTES); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } } boolean isScheduled(HintsStore store) @@ -255,9 +267,10 @@ final class HintsDispatchExecutor private boolean deliver(HintsDescriptor descriptor, InetAddress address) { File file = new File(hintsDirectory, descriptor.fileName()); - Long offset = store.getDispatchOffset(descriptor).orElse(null); + InputPosition offset = store.getDispatchOffset(descriptor); - try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, address, descriptor.hostId, isPaused)) + BooleanSupplier shouldAbort = () -> !isAlive.apply(address) || isPaused.get(); + try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, address, descriptor.hostId, shouldAbort)) { if (offset != null) dispatcher.seek(offset); @@ -271,7 +284,7 @@ final class HintsDispatchExecutor } else { - store.markDispatchOffset(descriptor, dispatcher.dispatchOffset()); + store.markDispatchOffset(descriptor, dispatcher.dispatchPosition()); store.offerFirst(descriptor); logger.info("Finished hinted handoff of file {} to endpoint {}: {}, partially", descriptor.fileName(), address, hostId); return false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/HintsDispatcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java index 351b3fa..db5f42f 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java @@ -22,7 +22,7 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; import java.util.function.Function; import com.google.common.util.concurrent.RateLimiter; @@ -30,7 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; @@ -52,25 +51,25 @@ final class HintsDispatcher implements AutoCloseable private final UUID hostId; private final InetAddress address; private final int messagingVersion; - private final AtomicBoolean isPaused; + private final BooleanSupplier abortRequested; - private long currentPageOffset; + private InputPosition currentPagePosition; - private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, AtomicBoolean isPaused) + private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, BooleanSupplier abortRequested) { - currentPageOffset = 0L; + currentPagePosition = null; this.reader = reader; this.hostId = hostId; this.address = address; this.messagingVersion = messagingVersion; - this.isPaused = isPaused; + this.abortRequested = abortRequested; } - static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, AtomicBoolean isPaused) + static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, BooleanSupplier abortRequested) { int messagingVersion = MessagingService.instance().getVersion(address); - return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, isPaused); + return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, abortRequested); } public void close() @@ -78,10 +77,9 @@ final class HintsDispatcher implements AutoCloseable reader.close(); } - void seek(long bytes) + void seek(InputPosition position) { - reader.seek(bytes); - currentPageOffset = 0L; + reader.seek(position); } /** @@ -91,7 +89,7 @@ final class HintsDispatcher implements AutoCloseable { for (HintsReader.Page page : reader) { - currentPageOffset = page.offset; + currentPagePosition = page.position; if (dispatch(page) != Action.CONTINUE) return false; } @@ -102,20 +100,11 @@ final class HintsDispatcher implements AutoCloseable /** * @return offset of the first non-delivered page */ - long dispatchOffset() + InputPosition dispatchPosition() { - return currentPageOffset; + return currentPagePosition; } - private boolean isHostAlive() - { - return FailureDetector.instance.isAlive(address); - } - - private boolean isPaused() - { - return isPaused.get(); - } // retry in case of a timeout; stop in case of a failure, host going down, or delivery paused private Action dispatch(HintsReader.Page page) @@ -156,7 +145,7 @@ final class HintsDispatcher implements AutoCloseable { while (hints.hasNext()) { - if (!isHostAlive() || isPaused()) + if (abortRequested.getAsBoolean()) return Action.ABORT; callbacks.add(sendFunction.apply(hints.next())); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/HintsReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java index 973a15e..7003e04 100644 --- a/src/java/org/apache/cassandra/hints/HintsReader.java +++ b/src/java/org/apache/cassandra/hints/HintsReader.java @@ -109,7 +109,7 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> return descriptor; } - void seek(long newPosition) + void seek(InputPosition newPosition) { input.seek(newPosition); } @@ -126,21 +126,21 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> final class Page { - public final long offset; + public final InputPosition position; - private Page(long offset) + private Page(InputPosition inputPosition) { - this.offset = offset; + this.position = inputPosition; } Iterator<Hint> hintsIterator() { - return new HintsIterator(offset); + return new HintsIterator(position); } Iterator<ByteBuffer> buffersIterator() { - return new BuffersIterator(offset); + return new BuffersIterator(position); } } @@ -149,12 +149,12 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> @SuppressWarnings("resource") protected Page computeNext() { - NativeLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, input.getFilePointer(), input.getPath()); + input.tryUncacheRead(); if (input.isEOF()) return endOfData(); - return new Page(input.getFilePointer()); + return new Page(input.getSeekPosition()); } } @@ -163,9 +163,9 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> */ final class HintsIterator extends AbstractIterator<Hint> { - private final long offset; + private final InputPosition offset; - HintsIterator(long offset) + HintsIterator(InputPosition offset) { super(); this.offset = offset; @@ -177,12 +177,12 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> do { - long position = input.getFilePointer(); + InputPosition position = input.getSeekPosition(); if (input.isEOF()) return endOfData(); // reached EOF - if (position - offset >= PAGE_SIZE) + if (position.subtract(offset) >= PAGE_SIZE) return endOfData(); // read page size or more bytes try @@ -260,9 +260,9 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> */ final class BuffersIterator extends AbstractIterator<ByteBuffer> { - private final long offset; + private final InputPosition offset; - BuffersIterator(long offset) + BuffersIterator(InputPosition offset) { super(); this.offset = offset; @@ -274,12 +274,12 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> do { - long position = input.getFilePointer(); + InputPosition position = input.getSeekPosition(); if (input.isEOF()) return endOfData(); // reached EOF - if (position - offset >= PAGE_SIZE) + if (position.subtract(offset) >= PAGE_SIZE) return endOfData(); // read page size or more bytes try http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/HintsService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java index 268ee1f..7ec359d 100644 --- a/src/java/org/apache/cassandra/hints/HintsService.java +++ b/src/java/org/apache/cassandra/hints/HintsService.java @@ -30,6 +30,7 @@ import java.util.function.Supplier; import javax.management.MBeanServer; import javax.management.ObjectName; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +38,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.metrics.HintedHandoffMetrics; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.dht.Token; @@ -60,7 +63,7 @@ public final class HintsService implements HintsServiceMBean { private static final Logger logger = LoggerFactory.getLogger(HintsService.class); - public static final HintsService instance = new HintsService(); + public static HintsService instance = new HintsService(); private static final String MBEAN_NAME = "org.apache.cassandra.hints:type=HintsService"; @@ -82,6 +85,12 @@ public final class HintsService implements HintsServiceMBean private HintsService() { + this(FailureDetector.instance); + } + + @VisibleForTesting + HintsService(IFailureDetector failureDetector) + { File hintsDirectory = DatabaseDescriptor.getHintsDirectory(); int maxDeliveryThreads = DatabaseDescriptor.getMaxHintsDeliveryThreads(); @@ -92,7 +101,7 @@ public final class HintsService implements HintsServiceMBean bufferPool = new HintsBufferPool(bufferSize, writeExecutor::flushBuffer); isDispatchPaused = new AtomicBoolean(true); - dispatchExecutor = new HintsDispatchExecutor(hintsDirectory, maxDeliveryThreads, isDispatchPaused); + dispatchExecutor = new HintsDispatchExecutor(hintsDirectory, maxDeliveryThreads, isDispatchPaused, failureDetector::isAlive); // periodically empty the current content of the buffers int flushPeriod = DatabaseDescriptor.getHintsFlushPeriodInMS(); @@ -225,7 +234,7 @@ public final class HintsService implements HintsServiceMBean * Will abort dispatch sessions that are currently in progress (which is okay, it's idempotent), * and make sure the buffers are flushed, hints files written and fsynced. */ - public synchronized void shutdownBlocking() + public synchronized void shutdownBlocking() throws ExecutionException, InterruptedException { if (isShutDown) throw new IllegalStateException("HintsService has already been shut down"); @@ -237,8 +246,8 @@ public final class HintsService implements HintsServiceMBean triggerFlushingFuture.cancel(false); - writeExecutor.flushBufferPool(bufferPool); - writeExecutor.closeAllWriters(); + writeExecutor.flushBufferPool(bufferPool).get(); + writeExecutor.closeAllWriters().get(); dispatchExecutor.shutdownBlocking(); writeExecutor.shutdownBlocking(); @@ -370,4 +379,12 @@ public final class HintsService implements HintsServiceMBean { return catalog; } + + /** + * Returns true in case service is shut down. + */ + public boolean isShutDown() + { + return isShutDown; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/HintsStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java index bb3aa0f..032de5a 100644 --- a/src/java/org/apache/cassandra/hints/HintsStore.java +++ b/src/java/org/apache/cassandra/hints/HintsStore.java @@ -50,7 +50,7 @@ final class HintsStore private final File hintsDirectory; private final ImmutableMap<String, Object> writerParams; - private final Map<HintsDescriptor, Long> dispatchOffsets; + private final Map<HintsDescriptor, InputPosition> dispatchPositions; private final Deque<HintsDescriptor> dispatchDequeue; private final Queue<HintsDescriptor> blacklistedFiles; @@ -64,7 +64,7 @@ final class HintsStore this.hintsDirectory = hintsDirectory; this.writerParams = writerParams; - dispatchOffsets = new ConcurrentHashMap<>(); + dispatchPositions = new ConcurrentHashMap<>(); dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors); blacklistedFiles = new ConcurrentLinkedQueue<>(); @@ -143,19 +143,19 @@ final class HintsStore return !dispatchDequeue.isEmpty(); } - Optional<Long> getDispatchOffset(HintsDescriptor descriptor) + InputPosition getDispatchOffset(HintsDescriptor descriptor) { - return Optional.ofNullable(dispatchOffsets.get(descriptor)); + return dispatchPositions.get(descriptor); } - void markDispatchOffset(HintsDescriptor descriptor, long mark) + void markDispatchOffset(HintsDescriptor descriptor, InputPosition inputPosition) { - dispatchOffsets.put(descriptor, mark); + dispatchPositions.put(descriptor, inputPosition); } void cleanUp(HintsDescriptor descriptor) { - dispatchOffsets.remove(descriptor); + dispatchPositions.remove(descriptor); } void blacklist(HintsDescriptor descriptor) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/src/java/org/apache/cassandra/hints/InputPosition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/InputPosition.java b/src/java/org/apache/cassandra/hints/InputPosition.java new file mode 100644 index 0000000..0b8953c --- /dev/null +++ b/src/java/org/apache/cassandra/hints/InputPosition.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +/** + * Marker interface for file positions as provided by the various ChecksummedDataReader implementations. + */ +public interface InputPosition +{ + long subtract(InputPosition other); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java index 928fd31..dcd31cf 100644 --- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java @@ -152,6 +152,37 @@ public class HintsCatalogTest assertEquals(0, store.getDispatchQueueSize()); } + @Test + public void deleteHintsTest() throws IOException + { + File directory = Files.createTempDirectory(null).toFile(); + UUID hostId1 = UUID.randomUUID(); + UUID hostId2 = UUID.randomUUID(); + long now = System.currentTimeMillis(); + writeDescriptor(directory, new HintsDescriptor(hostId1, now)); + writeDescriptor(directory, new HintsDescriptor(hostId1, now+1)); + writeDescriptor(directory, new HintsDescriptor(hostId2, now+2)); + writeDescriptor(directory, new HintsDescriptor(hostId2, now+3)); + + // load catalog containing two stores (one for each host) + HintsCatalog catalog = HintsCatalog.load(directory, ImmutableMap.of()); + assertEquals(2, catalog.stores().count()); + assertTrue(catalog.hasFiles()); + + // delete all hints from store 1 + assertTrue(catalog.get(hostId1).hasFiles()); + catalog.deleteAllHints(hostId1); + assertFalse(catalog.get(hostId1).hasFiles()); + // stores are still keepts for each host, even after deleting hints + assertEquals(2, catalog.stores().count()); + assertTrue(catalog.hasFiles()); + + // delete all hints from all stores + catalog.deleteAllHints(); + assertEquals(2, catalog.stores().count()); + assertFalse(catalog.hasFiles()); + } + @SuppressWarnings("EmptyTryBlock") private static void writeDescriptor(File directory, HintsDescriptor descriptor) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java index d6a08ca..656d7cd 100644 --- a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java @@ -117,9 +117,11 @@ public class HintsCompressionTest try (HintsReader reader = HintsReader.open(new File(dir, descriptor.fileName()))) { List<Hint> deserialized = new ArrayList<>(hintNum); + List<InputPosition> pagePositions = new ArrayList<>(hintNum); for (HintsReader.Page page: reader) { + pagePositions.add(page.position); Iterator<Hint> iterator = page.hintsIterator(); while (iterator.hasNext()) { @@ -134,6 +136,21 @@ public class HintsCompressionTest HintsTestUtil.assertHintsEqual(expected, deserialized.get(hintNum)); hintNum++; } + + // explicitely seek to each page by iterating collected page positions and check if hints still match as expected + int hintOffset = 0; + for (InputPosition pos : pagePositions) + { + reader.seek(pos); + HintsReader.Page page = reader.iterator().next(); + Iterator<Hint> iterator = page.hintsIterator(); + while (iterator.hasNext()) + { + Hint seekedHint = iterator.next(); + HintsTestUtil.assertHintsEqual(hints.get(hintOffset), seekedHint); + hintOffset++; + } + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4982587/test/unit/org/apache/cassandra/hints/HintsServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java new file mode 100644 index 0000000..ab1cbd0 --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.Nullable; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.datastax.driver.core.utils.MoreFutures; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.UpdateBuilder; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.gms.IFailureDetectionEventListener; +import org.apache.cassandra.gms.IFailureDetector; +import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.MockMessagingService; +import org.apache.cassandra.net.MockMessagingSpy; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.Util.dk; +import static org.apache.cassandra.net.MockMessagingService.verb; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class HintsServiceTest +{ + private static final String KEYSPACE = "hints_service_test"; + private static final String TABLE = "table"; + + private final MockFailureDetector failureDetector = new MockFailureDetector(); + + @BeforeClass + public static void defineSchema() + { + SchemaLoader.prepareServer(); + StorageService.instance.initServer(); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, TABLE)); + } + + @After + public void cleanup() + { + MockMessagingService.cleanup(); + } + + @Before + public void reinstanciateService() throws ExecutionException, InterruptedException + { + MessagingService.instance().clearMessageSinks(); + + if (!HintsService.instance.isShutDown()) + { + HintsService.instance.shutdownBlocking(); + HintsService.instance.deleteAllHints(); + } + + failureDetector.isAlive = true; + HintsService.instance = new HintsService(failureDetector); + HintsService.instance.startDispatch(); + } + + @Test + public void testDispatchHints() throws InterruptedException, ExecutionException + { + long cnt = StorageMetrics.totalHints.getCount(); + + // create spy for hint messages + MockMessagingSpy spy = sendHintsAndResponses(100, -1); + + // metrics should have been updated with number of create hints + assertEquals(cnt + 100, StorageMetrics.totalHints.getCount()); + + // wait until hints have been send + spy.interceptMessageOut(100).get(); + spy.interceptNoMsg(500, TimeUnit.MILLISECONDS).get(); + } + + @Test + public void testPauseAndResume() throws InterruptedException, ExecutionException + { + HintsService.instance.pauseDispatch(); + + // create spy for hint messages + MockMessagingSpy spy = sendHintsAndResponses(100, -1); + + // we should not send any hints while paused + ListenableFuture<Boolean> noMessagesWhilePaused = spy.interceptNoMsg(15, TimeUnit.SECONDS); + Futures.addCallback(noMessagesWhilePaused, new MoreFutures.SuccessCallback<Boolean>() + { + public void onSuccess(@Nullable Boolean aBoolean) + { + HintsService.instance.resumeDispatch(); + } + }); + + Futures.allAsList( + noMessagesWhilePaused, + spy.interceptMessageOut(100), + spy.interceptNoMsg(200, TimeUnit.MILLISECONDS) + ).get(); + } + + @Test + public void testPageRetry() throws InterruptedException, ExecutionException, TimeoutException + { + // create spy for hint messages, but only create responses for 5 hints + MockMessagingSpy spy = sendHintsAndResponses(20, 5); + + Futures.allAsList( + // the dispatcher will always send all hints within the current page + // and only wait for the acks before going to the next page + spy.interceptMessageOut(20), + spy.interceptNoMsg(200, TimeUnit.MILLISECONDS), + + // next tick will trigger a retry of the same page as we only replied with 5/20 acks + spy.interceptMessageOut(20) + ).get(); + + // marking the destination node as dead should stop sending hints + failureDetector.isAlive = false; + spy.interceptNoMsg(20, TimeUnit.SECONDS).get(); + } + + @Test + public void testPageSeek() throws InterruptedException, ExecutionException + { + // create spy for hint messages, stop replying after 12k (should be on 3rd page) + MockMessagingSpy spy = sendHintsAndResponses(20000, 12000); + + // At this point the dispatcher will constantly retry the page we stopped acking, + // thus we receive the same hints from the page multiple times and in total more than + // all written hints. Lets just consume them for a while and then pause the dispatcher. + spy.interceptMessageOut(22000).get(); + HintsService.instance.pauseDispatch(); + Thread.sleep(1000); + + // verify that we have a dispatch offset set for the page we're currently stuck at + HintsStore store = HintsService.instance.getCatalog().get(StorageService.instance.getLocalHostUUID()); + HintsDescriptor descriptor = store.poll(); + store.offerFirst(descriptor); // add again for cleanup during re-instanciation + InputPosition dispatchOffset = store.getDispatchOffset(descriptor); + assertTrue(dispatchOffset != null); + assertTrue(((ChecksummedDataInput.Position) dispatchOffset).sourcePosition > 0); + } + + private MockMessagingSpy sendHintsAndResponses(int noOfHints, int noOfResponses) + { + // create spy for hint messages, but only create responses for noOfResponses hints + MessageIn<HintResponse> messageIn = MessageIn.create(FBUtilities.getBroadcastAddress(), + HintResponse.instance, + Collections.emptyMap(), + MessagingService.Verb.REQUEST_RESPONSE, + MessagingService.current_version); + + MockMessagingSpy spy; + if (noOfResponses != -1) + { + spy = MockMessagingService.when(verb(MessagingService.Verb.HINT)).respondN(messageIn, noOfResponses); + } + else + { + spy = MockMessagingService.when(verb(MessagingService.Verb.HINT)).respond(messageIn); + } + + // create and write noOfHints using service + UUID hostId = StorageService.instance.getLocalHostUUID(); + for (int i = 0; i < noOfHints; i++) + { + long now = System.currentTimeMillis(); + DecoratedKey dkey = dk(String.valueOf(i)); + CFMetaData cfMetaData = Schema.instance.getCFMetaData(KEYSPACE, TABLE); + + UpdateBuilder builder = UpdateBuilder.create(cfMetaData, dkey) + .withTimestamp(now) + .newRow("column0") + .add("val", "value0"); + Hint hint = Hint.create((Mutation) builder.makeMutation(), now); + + HintsService.instance.write(hostId, hint); + } + return spy; + } + + private static class MockFailureDetector implements IFailureDetector + { + private boolean isAlive = true; + + public boolean isAlive(InetAddress ep) + { + return isAlive; + } + + public void interpret(InetAddress ep) + { + throw new UnsupportedOperationException(); + } + + public void report(InetAddress ep) + { + throw new UnsupportedOperationException(); + } + + public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) + { + throw new UnsupportedOperationException(); + } + + public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) + { + throw new UnsupportedOperationException(); + } + + public void remove(InetAddress ep) + { + throw new UnsupportedOperationException(); + } + + public void forceConviction(InetAddress ep) + { + throw new UnsupportedOperationException(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org