Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ae64cc0d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ae64cc0d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ae64cc0d

Branch: refs/heads/cassandra-2.2
Commit: ae64cc0da4b83289a88478c48ae73f608990a0ec
Parents: afe3fe3 8385bb6
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 17 18:07:42 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 17 18:07:42 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../io/compress/CompressionMetadata.java        | 30 +++++++++++
 .../streaming/messages/FileMessageHeader.java   | 53 ++++++++++++++++++--
 .../streaming/messages/IncomingFileMessage.java |  2 +-
 .../streaming/messages/OutgoingFileMessage.java | 14 ++----
 .../compress/CompressedInputStreamTest.java     |  7 +++
 6 files changed, 91 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae64cc0d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4cb9275,008d4d4..572afc2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,5 +1,18 @@@
 -2.1.12
 +2.2.4
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait 
(CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Create compression chunk for sending file only (CASSANDRA-10680)
   * Make buffered read size configurable (CASSANDRA-10249)
   * Forbid compact clustering column type changes in ALTER TABLE 
(CASSANDRA-8879)
   * Reject incremental repair with subrange repair (CASSANDRA-10422)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae64cc0d/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae64cc0d/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e9c99fe,34d9a01..e9a727f
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@@ -44,14 -43,16 +44,20 @@@ public class FileMessageHeade
      public final int sequenceNumber;
      /** SSTable version */
      public final String version;
 +
 +    /** SSTable format **/
 +    public final SSTableFormat.Type format;
      public final long estimatedKeys;
      public final List<Pair<Long, Long>> sections;
+     /**
+      * Compression info for SSTable to send. Can be null if SSTable is not 
compressed.
+      * On sender, this field is always null to avoid holding large number of 
Chunks.
+      * Use compressionMetadata instead.
+      */
      public final CompressionInfo compressionInfo;
+     private final CompressionMetadata compressionMetadata;
      public final long repairedAt;
 +    public final int sstableLevel;
  
      public FileMessageHeader(UUID cfId,
                               int sequenceNumber,
@@@ -70,10 -68,33 +76,38 @@@
          this.estimatedKeys = estimatedKeys;
          this.sections = sections;
          this.compressionInfo = compressionInfo;
+         this.compressionMetadata = null;
+         this.repairedAt = repairedAt;
++        this.sstableLevel = sstableLevel;
+     }
+ 
+     public FileMessageHeader(UUID cfId,
+                              int sequenceNumber,
+                              String version,
++                             SSTableFormat.Type format,
+                              long estimatedKeys,
+                              List<Pair<Long, Long>> sections,
+                              CompressionMetadata compressionMetadata,
 -                             long repairedAt)
++                             long repairedAt,
++                             int sstableLevel)
+     {
+         this.cfId = cfId;
+         this.sequenceNumber = sequenceNumber;
+         this.version = version;
++        this.format = format;
+         this.estimatedKeys = estimatedKeys;
+         this.sections = sections;
+         this.compressionInfo = null;
+         this.compressionMetadata = compressionMetadata;
          this.repairedAt = repairedAt;
 +        this.sstableLevel = sstableLevel;
      }
  
+     public boolean isCompressed()
+     {
+         return compressionInfo != null || compressionMetadata != null;
+     }
+ 
      /**
       * @return total file size to transfer in bytes
       */
@@@ -101,12 -126,10 +139,12 @@@
          sb.append("cfId: ").append(cfId);
          sb.append(", #").append(sequenceNumber);
          sb.append(", version: ").append(version);
 +        sb.append(", format: ").append(format);
          sb.append(", estimated keys: ").append(estimatedKeys);
          sb.append(", transfer size: ").append(size());
-         sb.append(", compressed?: ").append(compressionInfo != null);
+         sb.append(", compressed?: ").append(isCompressed());
          sb.append(", repairedAt: ").append(repairedAt);
 +        sb.append(", level: ").append(sstableLevel);
          sb.append(')');
          return sb.toString();
      }
@@@ -150,9 -166,13 +188,14 @@@
                  out.writeLong(section.left);
                  out.writeLong(section.right);
              }
-             CompressionInfo.serializer.serialize(header.compressionInfo, out, 
version);
+             // construct CompressionInfo here to avoid holding large number 
of Chunks on heap.
+             CompressionInfo compressionInfo = null;
+             if (header.compressionMetadata != null)
+                 compressionInfo = new 
CompressionInfo(header.compressionMetadata.getChunksForSections(header.sections),
 header.compressionMetadata.parameters);
+             CompressionInfo.serializer.serialize(compressionInfo, out, 
version);
              out.writeLong(header.repairedAt);
 +            out.writeInt(header.sstableLevel);
+             return compressionInfo;
          }
  
          public FileMessageHeader deserialize(DataInput in, int version) 
throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae64cc0d/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae64cc0d/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 5b34bd8,71902e1..c8175ea
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@@ -65,35 -66,26 +65,29 @@@ public class OutgoingFileMessage extend
          this.header = new FileMessageHeader(sstable.metadata.cfId,
                                              sequenceNumber,
                                              
sstable.descriptor.version.toString(),
 +                                            sstable.descriptor.formatType,
                                              estimatedKeys,
                                              sections,
-                                             compressionInfo,
+                                             sstable.compression ? 
sstable.getCompressionMetadata() : null,
 -                                            repairedAt);
 +                                            repairedAt,
 +                                            keepSSTableLevel ? 
sstable.getSSTableLevel() : 0);
      }
  
 -    public synchronized void serialize(DataOutputStreamAndChannel out, int 
version, StreamSession session) throws IOException
 +    public synchronized void serialize(DataOutputStreamPlus out, int version, 
StreamSession session) throws IOException
      {
          if (completed)
          {
              return;
          }
  
-         FileMessageHeader.serializer.serialize(header, out, version);
+         CompressionInfo compressionInfo = 
FileMessageHeader.serializer.serialize(header, out, version);
  
          final SSTableReader reader = ref.get();
-         StreamWriter writer = header.compressionInfo == null ?
+         StreamWriter writer = compressionInfo == null ?
                                        new StreamWriter(reader, 
header.sections, session) :
 -                                      new CompressedStreamWriter(reader, 
header.sections, compressionInfo, session);
 -        writer.write(out.getChannel());
 +                                      new CompressedStreamWriter(reader, 
header.sections,
-                                                                  
header.compressionInfo, session);
++                                                                 
compressionInfo, session);
 +        writer.write(out);
      }
  
      public synchronized void complete()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae64cc0d/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------

Reply via email to