Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a549bd08 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a549bd08 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a549bd08 Branch: refs/heads/cassandra-2.2 Commit: a549bd085f5244b3271249ce881ac30dd3f27553 Parents: 82aa796 068614c Author: Yuki Morishita <yu...@apache.org> Authored: Tue Nov 3 09:40:49 2015 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Nov 3 09:40:49 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/streaming/StreamReader.java | 17 ++++++++++++++--- .../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++--- 3 files changed, 29 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index ac997f2,4c24b35..5c23acf --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,11 -1,5 +1,12 @@@ -2.1.12 +2.2.4 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581) + * Expose phi values from failure detector via JMX and tweak debug + and trace logging (CASSANDRA-9526) + * Fix RangeNamesQueryPager (CASSANDRA-10509) + * Deprecate Pig support (CASSANDRA-10542) + * Reduce contention getting instances of CompositeType (CASSANDRA-10433) +Merged from 2.1: + * Fix streaming to catch exception so retry not fail (CASSANDRA-10557) * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092) * Support encrypted and plain traffic on the same port (CASSANDRA-10559) * Do STCS in DTCS windows (CASSANDRA-10276) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java index 1a3980d,5389a80..1ccebb0 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@@ -94,12 -89,12 +94,12 @@@ public class StreamReade } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format); - DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel))); BytesReadTracker in = new BytesReadTracker(dis); + SSTableWriter writer = null; try { - writer = createWriter(cfs, totalSize, repairedAt); ++ writer = createWriter(cfs, totalSize, repairedAt, format); while (in.getBytesRead() < totalSize) { writeRow(writer, in, cfs); @@@ -108,9 -102,21 +108,20 @@@ session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); } return writer; - } - catch (Throwable e) + } catch (Throwable e) { - writer.abort(); + if (writer != null) + { + try + { + writer.abort(); + } + catch (Throwable e2) + { + // add abort error to original and continue so we can drain unread stream + e.addSuppressed(e2); + } + } drain(dis, in.getBytesRead()); if (e instanceof IOException) throw (IOException) e; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 1936a94,0529496..facb906 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@@ -75,17 -72,15 +75,17 @@@ public class CompressedStreamReader ext } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format); - - CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums); + CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo); BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis)); + SSTableWriter writer = null; try { - writer = createWriter(cfs, totalSize, repairedAt); ++ writer = createWriter(cfs, totalSize, repairedAt, format); for (Pair<Long, Long> section : sections) { - long length = section.right - section.left; + assert cis.getTotalCompressedBytesRead() <= totalSize; + int sectionLength = (int) (section.right - section.left); + // skip to beginning of section inside chunk cis.position(section.left); in.reset(0);