Fix streaming to catch exception so retry not fail patch by yukim; reviewed by Paulo Motta for CASSANDRA-10557
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/068614cc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/068614cc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/068614cc Branch: refs/heads/cassandra-2.2 Commit: 068614ccc7ba6c5b8ccb50a0840af57bb45b4b36 Parents: 986a1a7 Author: Yuki Morishita <yu...@apache.org> Authored: Tue Oct 20 14:25:55 2015 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Nov 3 09:33:37 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/streaming/StreamReader.java | 16 ++++++++++++++-- .../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++--- 3 files changed, 29 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3d22b91..4c24b35 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.12 + * Fix streaming to catch exception so retry not fail (CASSANDRA-10557) * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092) * Support encrypted and plain traffic on the same port (CASSANDRA-10559) * Do STCS in DTCS windows (CASSANDRA-10276) http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index c96a925..5389a80 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -89,11 +89,12 @@ public class StreamReader } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - SSTableWriter writer = createWriter(cfs, totalSize, repairedAt); DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel))); BytesReadTracker in = new BytesReadTracker(dis); + SSTableWriter writer = null; try { + writer = createWriter(cfs, totalSize, repairedAt); while (in.getBytesRead() < totalSize) { writeRow(writer, in, cfs); @@ -104,7 +105,18 @@ public class StreamReader } catch (Throwable e) { - writer.abort(); + if (writer != null) + { + try + { + writer.abort(); + } + catch (Throwable e2) + { + // add abort error to original and continue so we can drain unread stream + e.addSuppressed(e2); + } + } drain(dis, in.getBytesRead()); if (e instanceof IOException) throw (IOException) e; http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index fb2599f..0529496 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -72,12 +72,12 @@ public class CompressedStreamReader extends StreamReader } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - SSTableWriter writer = createWriter(cfs, totalSize, repairedAt); - CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums); BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis)); + SSTableWriter writer = null; try { + writer = createWriter(cfs, totalSize, repairedAt); for (Pair<Long, Long> section : sections) { long length = section.right - section.left; @@ -95,7 +95,18 @@ public class CompressedStreamReader extends StreamReader } catch (Throwable e) { - writer.abort(); + if (writer != null) + { + try + { + writer.abort(); + } + catch (Throwable e2) + { + // add abort error to original and continue so we can drain unread stream + e.addSuppressed(e2); + } + } drain(cis, in.getBytesRead()); if (e instanceof IOException) throw (IOException) e;