Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f995a2d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f995a2d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f995a2d

Branch: refs/heads/cassandra-3.0
Commit: 0f995a2dc7a116ec6def110e10af6bb9acc9f7b3
Parents: 95012da 582bdba
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jan 13 13:13:11 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jan 13 13:13:11 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../cassandra/streaming/StreamReader.java       | 20 +++++++--
 .../cassandra/streaming/StreamWriter.java       | 10 ++++-
 .../compress/CompressedInputStream.java         | 41 ++++++-----------
 .../compress/CompressedStreamReader.java        | 26 +++++++----
 .../compress/CompressedStreamWriter.java        | 15 +++++++
 .../compression/CompressedInputStreamTest.java  | 46 --------------------
 8 files changed, 73 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 38786c1,11f2529..614d5b4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,5 +1,21 @@@
 -2.2.5
 +3.0.3
 + * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954)
 + * Fix UnsupportedOperationException when reading old sstable with range
 +   tombstone (CASSANDRA-10743)
 + * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
 + * Fix potential assertion error during compaction (CASSANDRA-10944)
 + * Fix counting of received sstables in streaming (CASSANDRA-10949)
 + * Implement hints compression (CASSANDRA-9428)
 + * Fix potential assertion error when reading static columns (CASSANDRA-10903)
 + * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
 + * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
 + * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
 + * Add back support for 3rd party auth providers to bulk loader 
(CASSANDRA-10873)
 + * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
 + * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes 
(CASSANDRA-10837)
 + * Fix sstableloader not working with upper case keyspace name 
(CASSANDRA-10806)
 +Merged from 2.2:
+  * Fix error streaming section more than 2GB (CASSANDRA-10961)
   * (cqlsh) Also apply --connect-timeout to control connection
     timeout (CASSANDRA-10959)
   * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 87dcda0,8789720..268f974
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -61,7 -60,7 +61,8 @@@ public class StreamReade
      protected final long repairedAt;
      protected final SSTableFormat.Type format;
      protected final int sstableLevel;
 +    protected final SerializationHeader.Component header;
+     protected final int fileSeqNum;
  
      protected Descriptor desc;
  
@@@ -75,7 -74,7 +76,8 @@@
          this.repairedAt = header.repairedAt;
          this.format = header.format;
          this.sstableLevel = header.sstableLevel;
 +        this.header = header.header;
+         this.fileSeqNum = header.sequenceNumber;
      }
  
      /**
@@@ -83,10 -82,9 +85,9 @@@
       * @return SSTable transferred
       * @throws IOException if reading the remote sstable fails. Will throw an 
RTE if local write fails.
       */
 -    @SuppressWarnings("resource")
 -    public SSTableWriter read(ReadableByteChannel channel) throws IOException
 +    @SuppressWarnings("resource") // channel needs to remain open, streams on 
top of it can't be closed
 +    public SSTableMultiWriter read(ReadableByteChannel channel) throws 
IOException
      {
-         logger.debug("reading file from {}, repairedAt = {}, level = {}", 
session.peer, repairedAt, sstableLevel);
          long totalSize = totalSize();
  
          Pair<String, String> kscf = Schema.instance.getCF(cfId);
@@@ -110,13 -117,25 +118,18 @@@
                  // TODO move this to BytesReadTracker
                  session.progress(desc, ProgressInfo.Direction.IN, 
in.getBytesRead(), totalSize);
              }
+             logger.debug("[Stream #{}] Finished receiving file #{} from {} 
readBytes = {}, totalSize = {}",
+                          session.planId(), fileSeqNum, session.peer, 
in.getBytesRead(), totalSize);
              return writer;
 -        } catch (Throwable e)
 +        }
 +        catch (Throwable e)
          {
 -            if (key != null)
++            if (deserializer != null)
+                 logger.warn("[Stream {}] Error while reading partition {} 
from stream on ks='{}' and table='{}'.",
 -                            session.planId(), key, cfs.keyspace.getName(), 
cfs.getColumnFamilyName());
++                            session.planId(), deserializer.partitionKey(), 
cfs.keyspace.getName(), cfs.getColumnFamilyName());
              if (writer != null)
              {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can 
drain unread stream
 -                    e.addSuppressed(e2);
 -                }
 +                writer.abort(e);
              }
              drain(dis, in.getBytesRead());
              if (e instanceof IOException)
@@@ -126,16 -145,14 +139,15 @@@
          }
      }
  
 -    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long 
totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
 +    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long 
totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
      {
 -        Directories.DataDirectory localDir = 
cfs.directories.getWriteableLocation(totalSize);
 +        Directories.DataDirectory localDir = 
cfs.getDirectories().getWriteableLocation(totalSize);
          if (localDir == null)
              throw new IOException("Insufficient disk space to store " + 
totalSize + " bytes");
- 
 -        desc = 
Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir),
 format));
 +        desc = 
Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir),
 format));
  
 -        return SSTableWriter.create(desc, estimatedKeys, repairedAt, 
sstableLevel);
 +
 +        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, 
sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
      }
  
      protected void drain(InputStream dis, long bytesRead) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 56dc63a,489fed9..55ac7ac
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -30,8 -30,10 +30,11 @@@ import java.util.zip.Checksum
  import com.google.common.collect.Iterators;
  import com.google.common.primitives.Ints;
  
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
  import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.utils.ChecksumType;
  import org.apache.cassandra.utils.WrappedRunnable;
  
  /**
@@@ -69,17 -71,15 +73,16 @@@ public class CompressedInputStream exte
       * @param source Input source to read compressed data from
       * @param info Compression info
       */
 -    public CompressedInputStream(InputStream source, CompressionInfo info)
 +    public CompressedInputStream(InputStream source, CompressionInfo info, 
ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
      {
          this.info = info;
 -        this.checksum =  new Adler32();
 +        this.checksum =  checksumType.newInstance();
          this.buffer = new byte[info.parameters.chunkLength()];
          // buffer is limited to store up to 1024 chunks
 -        this.dataBuffer = new 
ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
 +        this.dataBuffer = new 
ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 +        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
  
-         readerThread = new Thread(new Reader(source, info, dataBuffer));
-         readerThread.start();
+         new Thread(new Reader(source, info, dataBuffer)).start();
      }
  
      public int read() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 4d10244,c684e4f..5210d5b
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -24,7 -25,8 +24,8 @@@ import java.nio.channels.ReadableByteCh
  
  import com.google.common.base.Throwables;
  
+ import org.apache.cassandra.db.DecoratedKey;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -60,10 -64,9 +61,9 @@@ public class CompressedStreamReader ext
       * @throws java.io.IOException if reading the remote sstable fails. Will 
throw an RTE if local write fails.
       */
      @Override
 -    @SuppressWarnings("resource")
 -    public SSTableWriter read(ReadableByteChannel channel) throws IOException
 +    @SuppressWarnings("resource") // channel needs to remain open, streams on 
top of it can't be closed
 +    public SSTableMultiWriter read(ReadableByteChannel channel) throws 
IOException
      {
-         logger.debug("reading file from {}, repairedAt = {}", session.peer, 
repairedAt);
          long totalSize = totalSize();
  
          Pair<String, String> kscf = Schema.instance.getCF(cfId);
@@@ -72,13 -79,15 +76,16 @@@
              // schema was dropped during streaming
              throw new IOException("CF " + cfId + " was dropped during 
streaming");
          }
-         ColumnFamilyStore cfs = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ 
+         logger.debug("[Stream #{}] Start receiving file #{} from {}, 
repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+                      session.planId(), fileSeqNum, session.peer, repairedAt, 
totalSize, cfs.keyspace.getName(),
+                      cfs.getColumnFamilyName());
  
 -        CompressedInputStream cis = new 
CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
 +        CompressedInputStream cis = new 
CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
 +                                                              
inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
          BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
 -        SSTableWriter writer = null;
 -        DecoratedKey key = null;
 +        StreamDeserializer deserializer = new 
StreamDeserializer(cfs.metadata, in, inputVersion, 
header.toHeader(cfs.metadata));
 +        SSTableMultiWriter writer = null;
          try
          {
              writer = createWriter(cfs, totalSize, repairedAt, format);
@@@ -102,9 -117,20 +113,12 @@@
          }
          catch (Throwable e)
          {
 -            if (key != null)
++            if (deserializer != null)
+                 logger.warn("[Stream {}] Error while reading partition {} 
from stream on ks='{}' and table='{}'.",
 -                            session.planId(), key, cfs.keyspace.getName(), 
cfs.getColumnFamilyName());
++                            session.planId(), deserializer.partitionKey(), 
cfs.keyspace.getName(), cfs.getColumnFamilyName());
              if (writer != null)
              {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can 
drain unread stream
 -                    e.addSuppressed(e2);
 -                }
 +                writer.abort(e);
              }
              drain(cis, in.getBytesRead());
              if (e instanceof IOException)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index adbd091,99e9bd6..f37af29
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@@ -55,7 -59,9 +60,9 @@@ public class CompressedStreamWriter ext
      public void write(DataOutputStreamPlus out) throws IOException
      {
          long totalSize = totalSize();
+         logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt 
= {}, totalSize = {}", session.planId(),
+                      sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
 -        try (RandomAccessReader file = sstable.openDataReader(); final 
ChannelProxy fc = file.getChannel())
 +        try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
          {
              long progress = 0L;
              // calculate chunks to transfer. we want to send continuous 
chunks altogether.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 2162e32,0000000..a3300ac
mode 100644,000000..100644
--- 
a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@@ -1,183 -1,0 +1,137 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming.compression;
 +
 +import java.io.*;
 +import java.util.*;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Test;
 +import org.apache.cassandra.db.ClusteringComparator;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.schema.CompressionParams;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.streaming.compress.CompressedInputStream;
 +import org.apache.cassandra.streaming.compress.CompressionInfo;
 +import org.apache.cassandra.utils.ChecksumType;
 +import org.apache.cassandra.utils.Pair;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +/**
 + */
 +public class CompressedInputStreamTest
 +{
 +    @Test
 +    public void testCompressedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{0L}, false);
 +        testCompressedReadWith(new long[]{1L}, false);
 +        testCompressedReadWith(new long[]{100L}, false);
 +
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
 +    }
 +
 +    @Test(expected = EOFException.class)
 +    public void testTruncatedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
 +    }
 +
 +    /**
-      * Test CompressedInputStream not hang when closed while reading
-      * @throws IOException
-      */
-     @Test(expected = EOFException.class)
-     public void testClose() throws IOException
-     {
-         CompressionParams param = CompressionParams.snappy(32);
-         CompressionMetadata.Chunk[] chunks = {new 
CompressionMetadata.Chunk(0, 100)};
-         final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
-         InputStream blockingInput = new InputStream()
-         {
-             @Override
-             public int read() throws IOException
-             {
-                 try
-                 {
-                     // 10 second cut off not to stop other test in case
-                     return Objects.requireNonNull(blocker.poll(10, 
TimeUnit.SECONDS));
-                 }
-                 catch (InterruptedException e)
-                 {
-                     throw new IOException("Interrupted as expected", e);
-                 }
-             }
-         };
-         CompressionInfo info = new CompressionInfo(chunks, param);
-         try (CompressedInputStream cis = new 
CompressedInputStream(blockingInput, info, ChecksumType.CRC32, () -> 1.0))
-         {
-             new Thread(new Runnable()
-             {
-                 @Override
-                 public void run()
-                 {
-                     try
-                     {
-                         cis.close();
-                     }
-                     catch (Exception ignore) {}
-                 }
-             }).start();
-             // block here
-             cis.read();
-         }
-     }
- 
-     /**
 +     * @param valuesToCheck array of longs of range(0-999)
 +     * @throws Exception
 +     */
 +    private void testCompressedReadWith(long[] valuesToCheck, boolean 
testTruncate) throws Exception
 +    {
 +        assert valuesToCheck != null && valuesToCheck.length > 0;
 +
 +        // write compressed data file of longs
 +        File tmp = new File(File.createTempFile("cassandra", 
"unittest").getParent(), "ks-cf-ib-1-Data.db");
 +        Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
 +        MetadataCollector collector = new MetadataCollector(new 
ClusteringComparator(BytesType.instance));
 +        CompressionParams param = CompressionParams.snappy(32);
 +        Map<Long, Long> index = new HashMap<Long, Long>();
 +        try (CompressedSequentialWriter writer = new 
CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), 
param, collector))
 +        {
 +            for (long l = 0L; l < 1000; l++)
 +            {
 +                index.put(l, writer.position());
 +                writer.writeLong(l);
 +            }
 +            writer.finish();
 +        }
 +
 +        CompressionMetadata comp = 
CompressionMetadata.create(tmp.getAbsolutePath());
 +        List<Pair<Long, Long>> sections = new ArrayList<>();
 +        for (long l : valuesToCheck)
 +        {
 +            long position = index.get(l);
 +            sections.add(Pair.create(position, position + 8));
 +        }
 +        CompressionMetadata.Chunk[] chunks = 
comp.getChunksForSections(sections);
 +        long totalSize = comp.getTotalSizeForSections(sections);
 +        long expectedSize = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            expectedSize += c.length + 4;
 +        assertEquals(expectedSize, totalSize);
 +
 +        // buffer up only relevant parts of file
 +        int size = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            size += (c.length + 4); // 4bytes CRC
 +        byte[] toRead = new byte[size];
 +
 +        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
 +        {
 +            int pos = 0;
 +            for (CompressionMetadata.Chunk c : chunks)
 +            {
 +                f.seek(c.offset);
 +                pos += f.read(toRead, pos, c.length + 4);
 +            }
 +        }
 +
 +        if (testTruncate)
 +        {
 +            byte [] actuallyRead = new byte[50];
 +            System.arraycopy(toRead, 0, actuallyRead, 0, 50);
 +            toRead = actuallyRead;
 +        }
 +
 +        // read buffer using CompressedInputStream
 +        CompressionInfo info = new CompressionInfo(chunks, param);
 +        CompressedInputStream input = new CompressedInputStream(new 
ByteArrayInputStream(toRead), info,
 +                                                                
ChecksumType.CRC32, () -> 1.0);
 +
 +        try (DataInputStream in = new DataInputStream(input))
 +        {
 +            for (int i = 0; i < sections.size(); i++)
 +            {
 +                input.position(sections.get(i).left);
 +                long readValue = in.readLong();
 +                assertEquals("expected " + valuesToCheck[i] + " but was " + 
readValue, valuesToCheck[i], readValue);
 +            }
 +        }
 +    }
 +}

Reply via email to