Repository: cassandra Updated Branches: refs/heads/trunk 31eac784f -> d81dc27c7
Remove pre-3.0 streaming compatibility code for 4.0 Patch by Paulo Motta; Reviewed by Sylvain Lebresne for CASSANDRA-13081 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d81dc27c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d81dc27c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d81dc27c Branch: refs/heads/trunk Commit: d81dc27c7bde7c44a3c00526d803d9d5c7fe2604 Parents: 31eac78 Author: Paulo Motta <pauloricard...@gmail.com> Authored: Wed Jan 4 12:04:09 2017 -0200 Committer: Paulo Motta <pa...@apache.org> Committed: Wed Feb 15 13:45:53 2017 -0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamReader.java | 37 ++------------------ .../compress/CompressedStreamReader.java | 10 ++---- 3 files changed, 5 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d81dc27c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6efcaa3..0a76400 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081) * Add support for + and - operations on dates (CASSANDRA-11936) * Fix consistency of incrementally repaired data (CASSANDRA-9143) * Increase commitlog version (CASSANDRA-13161) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d81dc27c/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index fdc2ae2..7d00e48 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -40,7 +40,6 @@ import org.apache.cassandra.io.sstable.SSTableSimpleIterator; import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.io.util.RewindableDataInputStreamPlus; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.streaming.messages.FileMessageHeader; import org.apache.cassandra.utils.ByteBufferUtil; @@ -118,20 +117,14 @@ public class StreamReader } catch (Throwable e) { - if (deserializer != null) - logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", - session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName(), e); + logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", + session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName(), e); if (writer != null) { writer.abort(e); } throw Throwables.propagate(e); } - finally - { - if (deserializer != null) - deserializer.cleanup(); - } } protected SerializationHeader getHeader(TableMetadata metadata) @@ -166,13 +159,6 @@ public class StreamReader public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator { - public static final int INITIAL_MEM_BUFFER_SIZE = Integer.getInteger("cassandra.streamdes.initial_mem_buffer_size", 32768); - public static final int MAX_MEM_BUFFER_SIZE = Integer.getInteger("cassandra.streamdes.max_mem_buffer_size", 1048576); - public static final int MAX_SPILL_FILE_SIZE = Integer.getInteger("cassandra.streamdes.max_spill_file_size", Integer.MAX_VALUE); - - public static final String BUFFER_FILE_PREFIX = "buf"; - public static final String BUFFER_FILE_SUFFIX = "dat"; - private final TableMetadata metadata; private final DataInputPlus in; private final SerializationHeader header; @@ -279,24 +265,5 @@ public class StreamReader public void close() { } - - /* We have a separate cleanup method because sometimes close is called before exhausting the - StreamDeserializer (for instance, when enclosed in an try-with-resources wrapper, such as in - BigTableWriter.append()). - */ - public void cleanup() - { - if (in instanceof RewindableDataInputStreamPlus) - { - try - { - ((RewindableDataInputStreamPlus) in).close(false); - } - catch (IOException e) - { - logger.warn("Error while closing RewindableDataInputStreamPlus.", e); - } - } - } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d81dc27c/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index da62aa9..3e53fa2 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -110,9 +110,8 @@ public class CompressedStreamReader extends StreamReader } catch (Throwable e) { - if (deserializer != null) - logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", - session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName()); + logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", + session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName()); if (writer != null) { writer.abort(e); @@ -121,11 +120,6 @@ public class CompressedStreamReader extends StreamReader throw e; throw Throwables.propagate(e); } - finally - { - if (deserializer != null) - deserializer.cleanup(); - } } @Override