Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f995a2d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f995a2d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f995a2d Branch: refs/heads/cassandra-3.0 Commit: 0f995a2dc7a116ec6def110e10af6bb9acc9f7b3 Parents: 95012da 582bdba Author: Yuki Morishita <yu...@apache.org> Authored: Wed Jan 13 13:13:11 2016 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Jan 13 13:13:11 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/ConnectionHandler.java | 2 +- .../cassandra/streaming/StreamReader.java | 20 +++++++-- .../cassandra/streaming/StreamWriter.java | 10 ++++- .../compress/CompressedInputStream.java | 41 ++++++----------- .../compress/CompressedStreamReader.java | 26 +++++++---- .../compress/CompressedStreamWriter.java | 15 +++++++ .../compression/CompressedInputStreamTest.java | 46 -------------------- 8 files changed, 73 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 38786c1,11f2529..614d5b4 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,20 -1,5 +1,21 @@@ -2.2.5 +3.0.3 + * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954) + * Fix UnsupportedOperationException when reading old sstable with range + tombstone (CASSANDRA-10743) + * MV should use the maximum timestamp of the primary key (CASSANDRA-10910) + * Fix potential assertion error during compaction (CASSANDRA-10944) + * Fix counting of received sstables in streaming (CASSANDRA-10949) + * Implement hints compression (CASSANDRA-9428) + * Fix potential assertion error when reading static columns (CASSANDRA-10903) + * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711) + * Avoid building PartitionUpdate in toString (CASSANDRA-10897) + * Reduce heap spent when receiving many SSTables (CASSANDRA-10797) + * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873) + * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653) + * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837) + * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806) +Merged from 2.2: + * Fix error streaming section more than 2GB (CASSANDRA-10961) * (cqlsh) Also apply --connect-timeout to control connection timeout (CASSANDRA-10959) * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java index 87dcda0,8789720..268f974 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@@ -61,7 -60,7 +61,8 @@@ public class StreamReade protected final long repairedAt; protected final SSTableFormat.Type format; protected final int sstableLevel; + protected final SerializationHeader.Component header; + protected final int fileSeqNum; protected Descriptor desc; @@@ -75,7 -74,7 +76,8 @@@ this.repairedAt = header.repairedAt; this.format = header.format; this.sstableLevel = header.sstableLevel; + this.header = header.header; + this.fileSeqNum = header.sequenceNumber; } /** @@@ -83,10 -82,9 +85,9 @@@ * @return SSTable transferred * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. */ - @SuppressWarnings("resource") - public SSTableWriter read(ReadableByteChannel channel) throws IOException + @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed + public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException { - logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel); long totalSize = totalSize(); Pair<String, String> kscf = Schema.instance.getCF(cfId); @@@ -110,13 -117,25 +118,18 @@@ // TODO move this to BytesReadTracker session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); } + logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", + session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize); return writer; - } catch (Throwable e) + } + catch (Throwable e) { - if (key != null) ++ if (deserializer != null) + logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", - session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName()); ++ session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName()); if (writer != null) { - try - { - writer.abort(); - } - catch (Throwable e2) - { - // add abort error to original and continue so we can drain unread stream - e.addSuppressed(e2); - } + writer.abort(e); } drain(dis, in.getBytesRead()); if (e instanceof IOException) @@@ -126,16 -145,14 +139,15 @@@ } } - protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException + protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException { - Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize); + Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); if (localDir == null) throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); - - desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir), format)); + desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format)); - return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel); + + return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId)); } protected void drain(InputStream dis, long bytesRead) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index 56dc63a,489fed9..55ac7ac --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@@ -30,8 -30,10 +30,11 @@@ import java.util.zip.Checksum import com.google.common.collect.Iterators; import com.google.common.primitives.Ints; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.utils.ChecksumType; import org.apache.cassandra.utils.WrappedRunnable; /** @@@ -69,17 -71,15 +73,16 @@@ public class CompressedInputStream exte * @param source Input source to read compressed data from * @param info Compression info */ - public CompressedInputStream(InputStream source, CompressionInfo info) + public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier) { this.info = info; - this.checksum = new Adler32(); + this.checksum = checksumType.newInstance(); this.buffer = new byte[info.parameters.chunkLength()]; // buffer is limited to store up to 1024 chunks - this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024)); + this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024)); + this.crcCheckChanceSupplier = crcCheckChanceSupplier; - readerThread = new Thread(new Reader(source, info, dataBuffer)); - readerThread.start(); + new Thread(new Reader(source, info, dataBuffer)).start(); } public int read() throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 4d10244,c684e4f..5210d5b --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@@ -24,7 -25,8 +24,8 @@@ import java.nio.channels.ReadableByteCh import com.google.common.base.Throwables; + import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -60,10 -64,9 +61,9 @@@ public class CompressedStreamReader ext * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails. */ @Override - @SuppressWarnings("resource") - public SSTableWriter read(ReadableByteChannel channel) throws IOException + @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed + public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException { - logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt); long totalSize = totalSize(); Pair<String, String> kscf = Schema.instance.getCF(cfId); @@@ -72,13 -79,15 +76,16 @@@ // schema was dropped during streaming throw new IOException("CF " + cfId + " was dropped during streaming"); } - ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.", + session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), + cfs.getColumnFamilyName()); - CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo); + CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, + inputVersion.compressedChecksumType(), cfs::getCrcCheckChance); BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis)); - SSTableWriter writer = null; - DecoratedKey key = null; + StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata)); + SSTableMultiWriter writer = null; try { writer = createWriter(cfs, totalSize, repairedAt, format); @@@ -102,9 -117,20 +113,12 @@@ } catch (Throwable e) { - if (key != null) ++ if (deserializer != null) + logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", - session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName()); ++ session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName()); if (writer != null) { - try - { - writer.abort(); - } - catch (Throwable e2) - { - // add abort error to original and continue so we can drain unread stream - e.addSuppressed(e2); - } + writer.abort(e); } drain(cis, in.getBytesRead()); if (e instanceof IOException) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java index adbd091,99e9bd6..f37af29 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java @@@ -55,7 -59,9 +60,9 @@@ public class CompressedStreamWriter ext public void write(DataOutputStreamPlus out) throws IOException { long totalSize = totalSize(); + logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); - try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel()) + try (ChannelProxy fc = sstable.getDataChannel().sharedCopy()) { long progress = 0L; // calculate chunks to transfer. we want to send continuous chunks altogether. http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java index 2162e32,0000000..a3300ac mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java @@@ -1,183 -1,0 +1,137 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming.compression; + +import java.io.*; +import java.util.*; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.apache.cassandra.db.ClusteringComparator; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.streaming.compress.CompressedInputStream; +import org.apache.cassandra.streaming.compress.CompressionInfo; +import org.apache.cassandra.utils.ChecksumType; +import org.apache.cassandra.utils.Pair; + +import static org.junit.Assert.assertEquals; + +/** + */ +public class CompressedInputStreamTest +{ + @Test + public void testCompressedRead() throws Exception + { + testCompressedReadWith(new long[]{0L}, false); + testCompressedReadWith(new long[]{1L}, false); + testCompressedReadWith(new long[]{100L}, false); + + testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false); + } + + @Test(expected = EOFException.class) + public void testTruncatedRead() throws Exception + { + testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true); + } + + /** - * Test CompressedInputStream not hang when closed while reading - * @throws IOException - */ - @Test(expected = EOFException.class) - public void testClose() throws IOException - { - CompressionParams param = CompressionParams.snappy(32); - CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)}; - final SynchronousQueue<Integer> blocker = new SynchronousQueue<>(); - InputStream blockingInput = new InputStream() - { - @Override - public int read() throws IOException - { - try - { - // 10 second cut off not to stop other test in case - return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS)); - } - catch (InterruptedException e) - { - throw new IOException("Interrupted as expected", e); - } - } - }; - CompressionInfo info = new CompressionInfo(chunks, param); - try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, ChecksumType.CRC32, () -> 1.0)) - { - new Thread(new Runnable() - { - @Override - public void run() - { - try - { - cis.close(); - } - catch (Exception ignore) {} - } - }).start(); - // block here - cis.read(); - } - } - - /** + * @param valuesToCheck array of longs of range(0-999) + * @throws Exception + */ + private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception + { + assert valuesToCheck != null && valuesToCheck.length > 0; + + // write compressed data file of longs + File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db"); + Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath()); + MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); + CompressionParams param = CompressionParams.snappy(32); + Map<Long, Long> index = new HashMap<Long, Long>(); + try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector)) + { + for (long l = 0L; l < 1000; l++) + { + index.put(l, writer.position()); + writer.writeLong(l); + } + writer.finish(); + } + + CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath()); + List<Pair<Long, Long>> sections = new ArrayList<>(); + for (long l : valuesToCheck) + { + long position = index.get(l); + sections.add(Pair.create(position, position + 8)); + } + CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections); + long totalSize = comp.getTotalSizeForSections(sections); + long expectedSize = 0; + for (CompressionMetadata.Chunk c : chunks) + expectedSize += c.length + 4; + assertEquals(expectedSize, totalSize); + + // buffer up only relevant parts of file + int size = 0; + for (CompressionMetadata.Chunk c : chunks) + size += (c.length + 4); // 4bytes CRC + byte[] toRead = new byte[size]; + + try (RandomAccessFile f = new RandomAccessFile(tmp, "r")) + { + int pos = 0; + for (CompressionMetadata.Chunk c : chunks) + { + f.seek(c.offset); + pos += f.read(toRead, pos, c.length + 4); + } + } + + if (testTruncate) + { + byte [] actuallyRead = new byte[50]; + System.arraycopy(toRead, 0, actuallyRead, 0, 50); + toRead = actuallyRead; + } + + // read buffer using CompressedInputStream + CompressionInfo info = new CompressionInfo(chunks, param); + CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, + ChecksumType.CRC32, () -> 1.0); + + try (DataInputStream in = new DataInputStream(input)) + { + for (int i = 0; i < sections.size(); i++) + { + input.position(sections.get(i).left); + long readValue = in.readLong(); + assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue); + } + } + } +}