Repository: cassandra Updated Branches: refs/heads/trunk 6f5c7a4ea -> 375465d1c
Transfer hints to a different node on decommission patch by Marcus Eriksson; reviewed by Aleksey Yeschenko for CASSANDRA-10198 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/959b96ef Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/959b96ef Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/959b96ef Branch: refs/heads/trunk Commit: 959b96efee613363fde28de1e2b34aa9201efd7a Parents: a29d206 Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Sep 1 15:37:37 2015 +0200 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Fri Sep 18 19:14:41 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/hints/HintsCatalog.java | 8 +++ .../cassandra/hints/HintsDispatchExecutor.java | 64 +++++++++++++++++++- .../apache/cassandra/hints/HintsDispatcher.java | 4 +- .../apache/cassandra/hints/HintsService.java | 45 ++++++++++++-- .../cassandra/service/StorageService.java | 29 ++++----- 6 files changed, 124 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 775d8a4..d95d833 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.0-rc1 + * Transfer hints to a different node on decommission (CASSANDRA-10198) * Check partition keys for CAS operations during stmt validation (CASSANDRA-10338) * Add custom query expressions to SELECT (CASSANDRA-10217) * Fix minor bugs in MV handling (CASSANDRA-10362) http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/hints/HintsCatalog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java b/src/java/org/apache/cassandra/hints/HintsCatalog.java index 13404ee..cb8e1fd 100644 --- a/src/java/org/apache/cassandra/hints/HintsCatalog.java +++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java @@ -110,6 +110,14 @@ final class HintsCatalog store.deleteAllHints(); } + /** + * @return true if at least one of the stores has a file pending dispatch + */ + boolean hasFiles() + { + return stores().anyMatch(HintsStore::hasFiles); + } + void exciseStore(UUID hostId) { deleteAllHints(hostId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java index ab7bc7f..1f0191a 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import com.google.common.util.concurrent.RateLimiter; @@ -93,7 +94,12 @@ final class HintsDispatchExecutor * * It also simplifies reasoning about dispatch sessions. */ - return scheduledDispatches.computeIfAbsent(store.hostId, uuid -> executor.submit(new DispatchHintsTask(store, hostId))); + return scheduledDispatches.computeIfAbsent(hostId, uuid -> executor.submit(new DispatchHintsTask(store, hostId))); + } + + Future transfer(HintsCatalog catalog, Supplier<UUID> hostIdSupplier) + { + return executor.submit(new TransferHintsTask(catalog, hostIdSupplier)); } void completeDispatchBlockingly(HintsStore store) @@ -110,6 +116,60 @@ final class HintsDispatchExecutor } } + private final class TransferHintsTask implements Runnable + { + private final HintsCatalog catalog; + + /* + * Supplies target hosts to stream to. Generally returns the one the DynamicSnitch thinks is closest. + * We use a supplier here to be able to get a new host if the current one dies during streaming. + */ + private final Supplier<UUID> hostIdSupplier; + + private TransferHintsTask(HintsCatalog catalog, Supplier<UUID> hostIdSupplier) + { + this.catalog = catalog; + this.hostIdSupplier = hostIdSupplier; + } + + @Override + public void run() + { + UUID hostId = hostIdSupplier.get(); + logger.info("Transferring all hints to {}", hostId); + if (transfer(hostId)) + return; + + logger.warn("Failed to transfer all hints to {}; will retry in {} seconds", hostId, 10); + + try + { + TimeUnit.SECONDS.sleep(10); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + hostId = hostIdSupplier.get(); + logger.info("Transferring all hints to {}", hostId); + if (!transfer(hostId)) + { + logger.error("Failed to transfer all hints to {}", hostId); + throw new RuntimeException("Failed to transfer all hints to " + hostId); + } + } + + private boolean transfer(UUID hostId) + { + catalog.stores() + .map(store -> new DispatchHintsTask(store, hostId)) + .forEach(Runnable::run); + + return !catalog.hasFiles(); + } + } + private final class DispatchHintsTask implements Runnable { private final HintsStore store; @@ -179,7 +239,7 @@ final class HintsDispatchExecutor File file = new File(hintsDirectory, descriptor.fileName()); Long offset = store.getDispatchOffset(descriptor).orElse(null); - try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, isPaused)) + try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, descriptor.hostId, isPaused)) { if (offset != null) dispatcher.seek(offset); http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/hints/HintsDispatcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java index f769e09..94a6669 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java @@ -64,11 +64,11 @@ final class HintsDispatcher implements AutoCloseable this.isPaused = isPaused; } - static HintsDispatcher create(File file, RateLimiter rateLimiter, UUID hostId, AtomicBoolean isPaused) + static HintsDispatcher create(File file, RateLimiter rateLimiter, UUID hostId, UUID hintFor, AtomicBoolean isPaused) { InetAddress address = StorageService.instance.getEndpointForHostId(hostId); int messagingVersion = MessagingService.instance().getVersion(address); - return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, isPaused); + return new HintsDispatcher(HintsReader.open(file, rateLimiter), hintFor, address, messagingVersion, isPaused); } public void close() http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/hints/HintsService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java index 3f30c1d..6aed07f 100644 --- a/src/java/org/apache/cassandra/hints/HintsService.java +++ b/src/java/org/apache/cassandra/hints/HintsService.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.UUID; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -202,11 +203,6 @@ public final class HintsService implements HintsServiceMBean writeExecutor.shutdownBlocking(); } - public void decommission() - { - resumeDispatch(); - } - /** * Deletes all hints for all destinations. Doesn't make snapshots - should be used with care. */ @@ -288,4 +284,43 @@ public final class HintsService implements HintsServiceMBean // delete all the hints files and remove the HintsStore instance from the map in the catalog catalog.exciseStore(hostId); } + + /** + * Transfer all local hints to the hostId supplied by hostIdSupplier + * + * Flushes the buffer to make sure all hints are on disk and closes the hint writers + * so we don't leave any hint files around. + * + * After that, we serially dispatch all the hints in the HintsCatalog. + * + * If we fail delivering all hints, we will ask the hostIdSupplier for a new target host + * and retry delivering any remaining hints there, once, with a delay of 10 seconds before retrying. + * + * @param hostIdSupplier supplier of stream target host ids. This is generally + * the closest one according to the DynamicSnitch + * @return When this future is done, it either has streamed all hints to remote nodes or has failed with a proper + * log message + */ + public Future transferHints(Supplier<UUID> hostIdSupplier) + { + Future flushFuture = writeExecutor.flushBufferPool(bufferPool); + Future closeFuture = writeExecutor.closeAllWriters(); + try + { + flushFuture.get(); + closeFuture.get(); + } + catch (InterruptedException | ExecutionException e) + { + throw new RuntimeException(e); + } + + // unpause dispatch, or else transfer() will return immediately + resumeDispatch(); + + // wait for the current dispatch session to end + catalog.stores().forEach(dispatchExecutor::completeDispatchBlockingly); + + return dispatchExecutor.transfer(catalog, hostIdSupplier); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index dd428b6..0a8717f 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3438,7 +3438,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE setMode(Mode.LEAVING, "streaming hints to other nodes", true); - Future<StreamState> hintsSuccess = streamHints(); + Future hintsSuccess = streamHints(); // wait for the transfer runnables to signal the latch. logger.debug("waiting for stream acks."); @@ -3456,13 +3456,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE onFinish.run(); } - private Future<StreamState> streamHints() + private Future streamHints() { - // StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well) - ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS); - FBUtilities.waitOnFuture(hintsCF.forceFlush()); + return HintsService.instance.transferHints(this::getPreferredHintsStreamTarget); + } - // gather all live nodes in the cluster that aren't also leaving + /** + * Find the best target to stream hints to. Currently the closest peer according to the snitch + */ + private UUID getPreferredHintsStreamTarget() + { List<InetAddress> candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints()); candidates.remove(FBUtilities.getBroadcastAddress()); for (Iterator<InetAddress> iter = candidates.iterator(); iter.hasNext(); ) @@ -3475,7 +3478,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (candidates.isEmpty()) { logger.warn("Unable to stream hints since no live endpoints seen"); - return Futures.immediateFuture(null); + throw new RuntimeException("Unable to stream hints since no live endpoints seen"); } else { @@ -3483,17 +3486,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), candidates); InetAddress hintsDestinationHost = candidates.get(0); InetAddress preferred = SystemKeyspace.getPreferredIP(hintsDestinationHost); - - // stream all hints -- range list will be a singleton of "the entire ring" - Token token = tokenMetadata.partitioner.getMinimumToken(); - List<Range<Token>> ranges = Collections.singletonList(new Range<>(token, token)); - - return new StreamPlan("Hints").transferRanges(hintsDestinationHost, - preferred, - SystemKeyspace.NAME, - ranges, - SystemKeyspace.LEGACY_HINTS) - .execute(); + return tokenMetadata.getHostId(preferred); } }