Don't fail streams on failure detector downs Patch by JoshuaMcKenzie; reviewed by marcuse for CASSANDRA-3569
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f2d7d0b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f2d7d0b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f2d7d0b Branch: refs/heads/trunk Commit: 0f2d7d0b9540efa3ea3dfe4f8270c3635afdc63c Parents: 878990c Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Jun 3 08:11:56 2014 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Jun 3 08:11:56 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + debian/cassandra-sysctl.conf | 1 + .../org/apache/cassandra/repair/RepairJob.java | 15 ++++++-- .../apache/cassandra/repair/RepairSession.java | 37 ++++++++++++++++---- .../cassandra/service/ActiveRepairService.java | 1 - .../cassandra/streaming/ConnectionHandler.java | 1 + .../cassandra/streaming/StreamSession.java | 22 ++---------- .../cassandra/streaming/StreamTransferTask.java | 12 +++++-- 8 files changed, 58 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 333606a..b5c2feb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,7 @@ 2.1.0 * Upgrade to Pig 0.12.1 (CASSANDRA-6556) * Make sure we clear out repair sessions from netstats (CASSANDRA-7329) + * Don't fail streams on failure detector downs (CASSANDRA-3569) Merged from 2.0: * Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323) * Make StreamSession#closeSession() idempotent (CASSANDRA-7262) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/debian/cassandra-sysctl.conf ---------------------------------------------------------------------- diff --git a/debian/cassandra-sysctl.conf b/debian/cassandra-sysctl.conf index 2173765..443e83f 100644 --- a/debian/cassandra-sysctl.conf +++ b/debian/cassandra-sysctl.conf @@ -1 +1,2 @@ vm.max_map_count = 1048575 +net.ipv4.tcp_keepalive_time=300 http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index af00403..8057ed5 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -58,11 +58,21 @@ public class RepairJob /* Count down as sync completes */ private AtomicInteger waitForSync; + private final IRepairJobEventListener listener; + /** * Create repair job to run on specific columnfamily */ - public RepairJob(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor) + public RepairJob(IRepairJobEventListener listener, + UUID parentSessionId, + UUID sessionId, + String keyspace, + String columnFamily, + Range<Token> range, + boolean isSequential, + ListeningExecutorService taskExecutor) { + this.listener = listener; this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range); this.isSequential = isSequential; this.taskExecutor = taskExecutor; @@ -114,7 +124,8 @@ public class RepairJob public void onFailure(Throwable throwable) { // TODO need to propagate error to RepairSession - logger.error("Error while snapshot", throwable); + logger.error("Error occurred during snapshot phase", throwable); + listener.failedSnapshot(); failed = true; } }, taskExecutor); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 507dafa..346f3f4 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -74,7 +74,9 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition; * Similarly, if a job is sequential, it will handle one Differencer at a time, but will handle * all of them in parallel otherwise. */ -public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener +public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, + IFailureDetectionEventListener, + IRepairJobEventListener { private static Logger logger = LoggerFactory.getLogger(RepairSession.class); @@ -89,9 +91,11 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan private volatile Exception exception; private final AtomicBoolean isFailed = new AtomicBoolean(false); + private final AtomicBoolean fdUnregistered = new AtomicBoolean(false); // First, all RepairJobs are added to this queue, final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<>(); + // and after receiving all validation, the job is moved to // this map, keyed by CF name. final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>(); @@ -169,23 +173,32 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan assert job.desc.equals(desc); if (job.addTree(endpoint, tree) == 0) { - logger.debug("All response received for {}/{}", getId(), desc.columnFamily); + logger.debug("All responses received for {}/{}", getId(), desc.columnFamily); if (!job.isFailed()) { syncingJobs.put(job.desc.columnFamily, job); job.submitDifferencers(); } - // This job is complete, switching to next in line (note that only - // one thread will can ever do this) + // This job is complete, switching to next in line (note that only one thread will ever do this) jobs.poll(); RepairJob nextJob = jobs.peek(); if (nextJob == null) + { + // Unregister from FailureDetector once we've completed synchronizing Merkle trees. + // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down. + // See CASSANDRA-3569 + if (fdUnregistered.compareAndSet(false, true)) + FailureDetector.instance.unregisterFailureDetectionEventListener(this); + // We are done with this repair session as far as differencing // is considered. Just inform the session differencingDone.signalAll(); + } else + { nextJob.sendTreeRequests(endpoints); + } } } @@ -271,7 +284,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan // Create and queue a RepairJob for each column family for (String cfname : cfnames) { - RepairJob job = new RepairJob(parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor); + RepairJob job = new RepairJob(this, parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor); jobs.offer(job); } logger.debug("Sending tree requests to endpoints {}", endpoints); @@ -299,7 +312,13 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan { // mark this session as terminated terminate(); + ActiveRepairService.instance.removeFromActiveSessions(this); + + // If we've reached here in an exception state without completing Merkle Tree sync, we'll still be registered + // with the FailureDetector. + if (fdUnregistered.compareAndSet(false, true)) + FailureDetector.instance.unregisterFailureDetectionEventListener(this); } } @@ -320,11 +339,17 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan completed.signalAll(); } + public void failedSnapshot() + { + exception = new IOException("Failed during snapshot creation."); + forceShutdown(); + } + void failedNode(InetAddress remote) { String errorMsg = String.format("Endpoint %s died", remote); exception = new IOException(errorMsg); - // If a node failed, we stop everything (though there could still be some activity in the background) + // If a node failed during Merkle creation, we stop everything (though there could still be some activity in the background) forceShutdown(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index b300547..7f7325b 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -135,7 +135,6 @@ public class ActiveRepairService public void removeFromActiveSessions(RepairSession session) { - FailureDetector.instance.unregisterFailureDetectionEventListener(session); Gossiper.instance.unregister(session); sessions.remove(session.getId()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index 5484c83..5716ae9 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -123,6 +123,7 @@ public class ConnectionHandler { Socket socket = OutboundTcpConnectionPool.newSocket(peer); socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout()); + socket.setKeepAlive(true); return socket; } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 411f969..1afc07e 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -108,7 +108,7 @@ import org.apache.cassandra.utils.Pair; * session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and * send a CompleteMessage to the other side. */ -public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener +public class StreamSession implements IEndpointStateChangeSubscriber { private static final Logger logger = LoggerFactory.getLogger(StreamSession.class); public final InetAddress peer; @@ -181,10 +181,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe public void init(StreamResultFuture streamResult) { this.streamResult = streamResult; - - // register to gossiper/FD to fail on node failure - Gossiper.instance.register(this); - FailureDetector.instance.registerFailureDetectionEventListener(this); } public void start() @@ -358,8 +354,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe // incoming thread (so we would deadlock). handler.close(); - Gossiper.instance.unregister(this); - FailureDetector.instance.unregisterFailureDetectionEventListener(this); streamResult.handleSessionComplete(this); } } @@ -613,23 +607,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe public void onRemove(InetAddress endpoint) { - convict(endpoint, Double.MAX_VALUE); + closeSession(State.FAILED); } public void onRestart(InetAddress endpoint, EndpointState epState) { - convict(endpoint, Double.MAX_VALUE); - } - - public void convict(InetAddress endpoint, double phi) - { - if (!endpoint.equals(peer)) - return; - - // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost. - if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold()) - return; - closeSession(State.FAILED); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index 2fe75fa..48a7d89 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -20,6 +20,7 @@ package org.apache.cassandra.streaming; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.streaming.messages.OutgoingFileMessage; @@ -33,6 +34,7 @@ public class StreamTransferTask extends StreamTask private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); private final AtomicInteger sequenceNumber = new AtomicInteger(0); + private AtomicBoolean aborted = new AtomicBoolean(false); private final Map<Integer, OutgoingFileMessage> files = new ConcurrentHashMap<>(); @@ -75,11 +77,15 @@ public class StreamTransferTask extends StreamTask public void abort() { - for (OutgoingFileMessage file : files.values()) + // Prevent releasing reference multiple times + if (aborted.compareAndSet(false, true)) { - file.sstable.releaseReference(); + for (OutgoingFileMessage file : files.values()) + { + file.sstable.releaseReference(); + } + timeoutExecutor.shutdownNow(); } - timeoutExecutor.shutdownNow(); } public int getTotalNumberOfFiles()