Fix hanging stream session by preventing CompressedStreamReader from blocking on IOException. Also removed retry support from streaming.
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-10992 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/76e3100f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/76e3100f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/76e3100f Branch: refs/heads/cassandra-3.9 Commit: 76e3100ffb106cab3cc665404e293c1026e5e65c Parents: bc9af92 Author: Paulo Motta <pauloricard...@gmail.com> Authored: Thu Jun 23 11:33:54 2016 -0300 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Aug 9 16:31:34 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/Config.java | 4 ++ .../cassandra/config/DatabaseDescriptor.java | 5 -- .../cassandra/streaming/StreamReader.java | 26 +---------- .../cassandra/streaming/StreamSession.java | 36 +------------- .../compress/CompressedInputStream.java | 21 ++++++++- .../compress/CompressedStreamReader.java | 10 ++-- .../streaming/messages/IncomingFileMessage.java | 22 ++------- .../streaming/messages/RetryMessage.java | 4 ++ .../org/apache/cassandra/utils/Throwables.java | 17 +++++++ .../compress/CompressedInputStreamTest.java | 49 +++++++++++++++++--- 11 files changed, 102 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f734476..232203e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.8 + * Fix hanging stream session (CASSANDRA-10992) * Add byteman support for testing (CASSANDRA-12377) * Fix INSERT JSON, fromJson() support of smallint, tinyint types (CASSANDRA-12371) * Restore JVM metric export for metric reporters (CASSANDRA-12312) http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index ede4560..60daee6 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -171,6 +171,10 @@ public class Config public volatile Integer compaction_throughput_mb_per_sec = 16; public volatile Integer compaction_large_partition_warning_threshold_mb = 100; + /** + * @deprecated retry support removed on CASSANDRA-10992 + */ + @Deprecated public Integer max_streaming_retries = 3; public volatile Integer stream_throughput_outbound_megabits_per_sec = 200; http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index f1acfc4..6e46725 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -957,11 +957,6 @@ public class DatabaseDescriptor return conf.cluster_name; } - public static int getMaxStreamingRetries() - { - return conf.max_streaming_retries; - } - public static int getStoragePort() { return Integer.parseInt(System.getProperty("cassandra.storage_port", conf.storage_port.toString())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 8789720..c96ea22 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -45,6 +45,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.BytesReadTracker; import org.apache.cassandra.utils.Pair; +import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause; /** * StreamReader reads from stream and writes to SSTable. @@ -137,11 +138,7 @@ public class StreamReader e.addSuppressed(e2); } } - drain(dis, in.getBytesRead()); - if (e instanceof IOException) - throw (IOException) e; - else - throw Throwables.propagate(e); + throw Throwables.propagate(e); } } @@ -155,25 +152,6 @@ public class StreamReader return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel); } - protected void drain(InputStream dis, long bytesRead) throws IOException - { - long toSkip = totalSize() - bytesRead; - - // InputStream.skip can return -1 if dis is inaccessible. - long skipped = dis.skip(toSkip); - if (skipped == -1) - return; - - toSkip = toSkip - skipped; - while (toSkip > 0) - { - skipped = dis.skip(toSkip); - if (skipped == -1) - break; - toSkip = toSkip - skipped; - } - } - protected long totalSize() { long size = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 294b9c1..0f43f1f 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -105,10 +105,8 @@ import org.apache.cassandra.utils.concurrent.Refs; * complete (received()). When all files for the StreamReceiveTask have been received, the sstables * are added to the CFS (and 2ndary index are built, StreamReceiveTask.complete()) and the task * is marked complete (taskCompleted()) - * (b) If during the streaming of a particular file an I/O error occurs on the receiving end of a stream - * (FileMessage.deserialize), the node will retry the file (up to DatabaseDescriptor.getMaxStreamingRetries()) - * by sending a RetryMessage to the sender. On receiving a RetryMessage, the sender simply issue a new - * FileMessage for that file. + * (b) If during the streaming of a particular file an error occurs on the receiving end of a stream + * (FileMessage.deserialize), the node will send a SessionFailedMessage to the sender and close the stream session. * (c) When all transfer and receive tasks for a session are complete, the move to the Completion phase * (maybeCompleted()). * @@ -149,8 +147,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber public final ConnectionHandler handler; - private int retries; - private AtomicBoolean isAborted = new AtomicBoolean(false); private final boolean keepSSTableLevel; private final boolean isIncremental; @@ -481,11 +477,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber received(received.cfId, received.sequenceNumber); break; - case RETRY: - RetryMessage retry = (RetryMessage) message; - retry(retry.cfId, retry.sequenceNumber); - break; - case COMPLETE: complete(); break; @@ -610,18 +601,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber } /** - * Call back on receiving {@code StreamMessage.Type.RETRY} message. - * - * @param cfId ColumnFamily ID - * @param sequenceNumber Sequence number to indicate which file to stream again - */ - public void retry(UUID cfId, int sequenceNumber) - { - OutgoingFileMessage message = transfers.get(cfId).createMessageForRetry(sequenceNumber); - handler.sendMessage(message); - } - - /** * Check if session is completed on receiving {@code StreamMessage.Type.COMPLETE} message. */ public synchronized void complete() @@ -651,17 +630,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber closeSession(State.FAILED); } - public void doRetry(FileMessageHeader header, Throwable e) - { - logger.warn("[Stream #{}] Retrying for following error", planId(), e); - // retry - retries++; - if (retries > DatabaseDescriptor.getMaxStreamingRetries()) - onError(new IOException("Too many retries for " + header, e)); - else - handler.sendMessage(new RetryMessage(header.cfId, header.sequenceNumber)); - } - /** * @return Current snapshot of this session info. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index 489fed9..d08ffa9 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@ -63,8 +63,17 @@ public class CompressedInputStream extends InputStream // raw checksum bytes private final byte[] checksumBytes = new byte[4]; + /** + * Indicates there was a problem when reading from source stream. + * When this is added to the <code>dataBuffer</code> by the stream Reader, + * it is expected that the <code>readException</code> variable is populated + * with the cause of the error when reading from source stream, so it is + * thrown to the consumer on subsequent read operation. + */ private static final byte[] POISON_PILL = new byte[0]; + protected volatile IOException readException = null; + private long totalCompressedBytesRead; /** @@ -84,13 +93,19 @@ public class CompressedInputStream extends InputStream public int read() throws IOException { + if (readException != null) + throw readException; + if (current >= bufferOffset + buffer.length || validBufferBytes == -1) { try { byte[] compressedWithCRC = dataBuffer.take(); if (compressedWithCRC == POISON_PILL) - throw new EOFException("No chunk available"); + { + assert readException != null; + throw readException; + } decompress(compressedWithCRC); } catch (InterruptedException e) @@ -138,7 +153,7 @@ public class CompressedInputStream extends InputStream return totalCompressedBytesRead; } - static class Reader extends WrappedRunnable + class Reader extends WrappedRunnable { private final InputStream source; private final Iterator<CompressionMetadata.Chunk> chunks; @@ -169,6 +184,7 @@ public class CompressedInputStream extends InputStream int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead); if (r < 0) { + readException = new EOFException("No chunk available"); dataBuffer.put(POISON_PILL); return; // throw exception where we consume dataBuffer } @@ -177,6 +193,7 @@ public class CompressedInputStream extends InputStream catch (IOException e) { logger.warn("Error while reading compressed input stream.", e); + readException = e; dataBuffer.put(POISON_PILL); return; // throw exception where we consume dataBuffer } http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index c684e4f..fa1022d 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -44,6 +44,8 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.BytesReadTracker; import org.apache.cassandra.utils.Pair; +import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause; + /** * StreamReader that reads from streamed compressed SSTable */ @@ -132,11 +134,9 @@ public class CompressedStreamReader extends StreamReader e.addSuppressed(e2); } } - drain(cis, in.getBytesRead()); - if (e instanceof IOException) - throw (IOException) e; - else - throw Throwables.propagate(e); + if (extractIOExceptionCause(e).isPresent()) + throw e; + throw Throwables.propagate(e); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java index 31ab2a8..2870c03 100644 --- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; +import com.google.common.base.Optional; + import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.streaming.StreamReader; @@ -29,6 +31,8 @@ import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.compress.CompressedStreamReader; import org.apache.cassandra.utils.JVMStabilityInspector; +import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause; + /** * IncomingFileMessage is used to receive the part(or whole) of a SSTable data file. */ @@ -48,26 +52,10 @@ public class IncomingFileMessage extends StreamMessage { return new IncomingFileMessage(reader.read(in), header); } - catch (IOException eof) - { - // Reading from remote failed(i.e. reached EOF before reading expected length of data). - // This can be caused by network/node failure thus we are not retrying - throw eof; - } catch (Throwable t) { - // Throwable can be Runtime error containing IOException. - // In that case we don't want to retry. - Throwable cause = t; - while ((cause = cause.getCause()) != null) - { - if (cause instanceof IOException) - throw (IOException) cause; - } JVMStabilityInspector.inspectThrowable(t); - // Otherwise, we can retry - session.doRetry(header, t); - return null; + throw t; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java index 29e84bf..6673aa1 100644 --- a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java @@ -27,6 +27,10 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.UUIDSerializer; +/** + * @deprecated retry support removed on CASSANDRA-10992 + */ +@Deprecated public class RetryMessage extends StreamMessage { public static Serializer<RetryMessage> serializer = new Serializer<RetryMessage>() http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/utils/Throwables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java index a895f31..877f388 100644 --- a/src/java/org/apache/cassandra/utils/Throwables.java +++ b/src/java/org/apache/cassandra/utils/Throwables.java @@ -18,6 +18,10 @@ */ package org.apache.cassandra.utils; +import java.io.IOException; + +import com.google.common.base.Optional; + public class Throwables { @@ -50,4 +54,17 @@ public class Throwables } return accumulate; } + + public static Optional<IOException> extractIOExceptionCause(Throwable t) + { + if (t instanceof IOException) + return Optional.of((IOException) t); + Throwable cause = t; + while ((cause = cause.getCause()) != null) + { + if (cause instanceof IOException) + return Optional.of((IOException) cause); + } + return Optional.absent(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java index 0becd18..87d93fd 100644 --- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java @@ -36,6 +36,7 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.utils.Pair; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; /** */ @@ -44,24 +45,33 @@ public class CompressedInputStreamTest @Test public void testCompressedRead() throws Exception { - testCompressedReadWith(new long[]{0L}, false); - testCompressedReadWith(new long[]{1L}, false); - testCompressedReadWith(new long[]{100L}, false); + testCompressedReadWith(new long[]{0L}, false, false); + testCompressedReadWith(new long[]{1L}, false, false); + testCompressedReadWith(new long[]{100L}, false, false); - testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false); + testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false, false); } @Test(expected = EOFException.class) public void testTruncatedRead() throws Exception { - testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true); + testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true, false); + } + + /** + * Test that CompressedInputStream does not block if there's an exception while reading stream + */ + @Test(timeout = 30000) + public void testException() throws Exception + { + testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false, true); } /** * @param valuesToCheck array of longs of range(0-999) * @throws Exception */ - private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception + private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate, boolean testException) throws Exception { assert valuesToCheck != null && valuesToCheck.length > 0; @@ -120,6 +130,12 @@ public class CompressedInputStreamTest // read buffer using CompressedInputStream CompressionInfo info = new CompressionInfo(chunks, param); + + if (testException) + { + testException(sections, info); + return; + } CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info); try (DataInputStream in = new DataInputStream(input)) @@ -132,4 +148,25 @@ public class CompressedInputStreamTest } } } + + private static void testException(List<Pair<Long, Long>> sections, CompressionInfo info) throws IOException + { + CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(new byte[0]), info); + + try (DataInputStream in = new DataInputStream(input)) + { + for (int i = 0; i < sections.size(); i++) + { + input.position(sections.get(i).left); + try { + in.readLong(); + fail("Should have thrown IOException"); + } + catch (IOException e) + { + continue; + } + } + } + } }