Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/be211749 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/be211749 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/be211749 Branch: refs/heads/trunk Commit: be2117492f3d9ace24bbf18e57e94b2a08965763 Parents: 2a24acf 95839aa Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Fri Sep 29 15:31:37 2017 -0700 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Fri Sep 29 15:32:32 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../cassandra/db/commitlog/CommitLogReader.java | 53 +++++++++++++++++++- .../db/commitlog/CommitLogReplayer.java | 3 -- .../db/commitlog/CommitLogSegment.java | 2 +- .../db/commitlog/CompressedSegment.java | 2 +- .../db/commitlog/EncryptedSegment.java | 4 +- .../db/commitlog/MemoryMappedSegment.java | 2 +- .../cassandra/db/commitlog/CommitLogTest.java | 53 ++++++++++++++++++++ 8 files changed, 110 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 6c3a1d0,7ff61d3..a782333 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,20 -1,9 +1,20 @@@ -3.0.15 +3.11.1 - ======= + * Fix the computation of cdc_total_space_in_mb for exabyte filesystems (CASSANDRA-13808) + * Handle limit correctly on tables with strict liveness (CASSANDRA-13883) + * AbstractTokenTreeBuilder#serializedSize returns wrong value when there is a single leaf and overflow collisions (CASSANDRA-13869) + * Add a compaction option to TWCS to ignore sstables overlapping checks (CASSANDRA-13418) + * BTree.Builder memory leak (CASSANDRA-13754) + * Revert CASSANDRA-10368 of supporting non-pk column filtering due to correctness (CASSANDRA-13798) + * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938) + * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744) + * "ignore" option is ignored in sstableloader (CASSANDRA-13721) + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652) + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512) + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641) +Merged from 3.0: + * Filter header only commit logs before recovery (CASSANDRA-13918) * AssertionError prepending to a list (CASSANDRA-13149) * Fix support for SuperColumn tables (CASSANDRA-12373) - * Handle limit correctly on tables with strict liveness (CASSANDRA-13883) - * Fix missing original update in TriggerExecutor (CASSANDRA-13894) * Remove non-rpc-ready nodes from counter leader candidates (CASSANDRA-13043) * Improve short read protection performance (CASSANDRA-13794) * Fix sstable reader to support range-tombstone-marker for multi-slices (CASSANDRA-13787) http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java index 8c04329,0000000..4d74557 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java @@@ -1,515 -1,0 +1,564 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.CRC32; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.UnknownColumnFamilyException; +import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason; +import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.SerializationHelper; ++import org.apache.cassandra.exceptions.ConfigurationException; ++import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.RebufferingInputStream; +import org.apache.cassandra.utils.JVMStabilityInspector; + +import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; + +public class CommitLogReader +{ + private static final Logger logger = LoggerFactory.getLogger(CommitLogReader.class); + + private static final int LEGACY_END_OF_SEGMENT_MARKER = 0; + + @VisibleForTesting + public static final int ALL_MUTATIONS = -1; + private final CRC32 checksum; + private final Map<UUID, AtomicInteger> invalidMutations; + + private byte[] buffer; + + public CommitLogReader() + { + checksum = new CRC32(); + invalidMutations = new HashMap<>(); + buffer = new byte[4096]; + } + + public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations() + { + return invalidMutations.entrySet(); + } + + /** + * Reads all passed in files with no minimum, no start, and no mutation limit. + */ + public void readAllFiles(CommitLogReadHandler handler, File[] files) throws IOException + { + readAllFiles(handler, files, CommitLogPosition.NONE); + } + ++ private static boolean shouldSkip(File file) throws IOException, ConfigurationException ++ { ++ CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); ++ if (desc.version < CommitLogDescriptor.VERSION_21) ++ { ++ return false; ++ } ++ try(RandomAccessReader reader = RandomAccessReader.open(file)) ++ { ++ CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext()); ++ int end = reader.readInt(); ++ long filecrc = reader.readInt() & 0xffffffffL; ++ return end == 0 && filecrc == 0; ++ } ++ } ++ ++ private static List<File> filterCommitLogFiles(File[] toFilter) ++ { ++ List<File> filtered = new ArrayList<>(toFilter.length); ++ for (File file: toFilter) ++ { ++ try ++ { ++ if (shouldSkip(file)) ++ { ++ logger.info("Skipping playback of empty log: {}", file.getName()); ++ } ++ else ++ { ++ filtered.add(file); ++ } ++ } ++ catch (Exception e) ++ { ++ // let recover deal with it ++ filtered.add(file); ++ } ++ } ++ ++ return filtered; ++ } ++ + /** + * Reads all passed in files with minPosition, no start, and no mutation limit. + */ + public void readAllFiles(CommitLogReadHandler handler, File[] files, CommitLogPosition minPosition) throws IOException + { - for (int i = 0; i < files.length; i++) - readCommitLogSegment(handler, files[i], minPosition, ALL_MUTATIONS, i + 1 == files.length); ++ List<File> filteredLogs = filterCommitLogFiles(files); ++ int i = 0; ++ for (File file: filteredLogs) ++ { ++ i++; ++ readCommitLogSegment(handler, file, minPosition, ALL_MUTATIONS, i == filteredLogs.size()); ++ } + } + + /** + * Reads passed in file fully + */ + public void readCommitLogSegment(CommitLogReadHandler handler, File file, boolean tolerateTruncation) throws IOException + { + readCommitLogSegment(handler, file, CommitLogPosition.NONE, ALL_MUTATIONS, tolerateTruncation); + } + + /** + * Reads passed in file fully, up to mutationLimit count + */ + @VisibleForTesting + public void readCommitLogSegment(CommitLogReadHandler handler, File file, int mutationLimit, boolean tolerateTruncation) throws IOException + { + readCommitLogSegment(handler, file, CommitLogPosition.NONE, mutationLimit, tolerateTruncation); + } + + /** + * Reads mutations from file, handing them off to handler + * @param handler Handler that will take action based on deserialized Mutations + * @param file CommitLogSegment file to read + * @param minPosition Optional minimum CommitLogPosition - all segments with id > or matching w/greater position will be read + * @param mutationLimit Optional limit on # of mutations to replay. Local ALL_MUTATIONS serves as marker to play all. + * @param tolerateTruncation Whether or not we should allow truncation of this file or throw if EOF found + * + * @throws IOException + */ + public void readCommitLogSegment(CommitLogReadHandler handler, + File file, + CommitLogPosition minPosition, + int mutationLimit, + boolean tolerateTruncation) throws IOException + { + // just transform from the file name (no reading of headers) to determine version + CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); + + try(RandomAccessReader reader = RandomAccessReader.open(file)) + { + if (desc.version < CommitLogDescriptor.VERSION_21) + { + if (!shouldSkipSegmentId(file, desc, minPosition)) + { + if (minPosition.segmentId == desc.id) + reader.seek(minPosition.position); + ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation); + statusTracker.errorContext = desc.fileName(); + readSection(handler, reader, minPosition, (int) reader.length(), statusTracker, desc); + } + return; + } + + final long segmentIdFromFilename = desc.id; + try + { + // The following call can either throw or legitimately return null. For either case, we need to check + // desc outside this block and set it to null in the exception case. + desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext()); + } + catch (Exception e) + { + desc = null; + } + if (desc == null) + { + // don't care about whether or not the handler thinks we can continue. We can't w/out descriptor. + // whether or not we continue with startup will depend on whether this is the last segment + handler.handleUnrecoverableError(new CommitLogReadException( + String.format("Could not read commit log descriptor in file %s", file), + CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR, + tolerateTruncation)); + return; + } + + if (segmentIdFromFilename != desc.id) + { + if (handler.shouldSkipSegmentOnError(new CommitLogReadException(String.format( + "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentIdFromFilename, desc.id, file), + CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR, + false))) + { + return; + } + } + + if (shouldSkipSegmentId(file, desc, minPosition)) + return; + + CommitLogSegmentReader segmentReader; + try + { + segmentReader = new CommitLogSegmentReader(handler, desc, reader, tolerateTruncation); + } + catch(Exception e) + { + handler.handleUnrecoverableError(new CommitLogReadException( + String.format("Unable to create segment reader for commit log file: %s", e), + CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR, + tolerateTruncation)); + return; + } + + try + { + ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation); + for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader) + { + // Only tolerate truncation if we allow in both global and segment + statusTracker.tolerateErrorsInSection = tolerateTruncation & syncSegment.toleratesErrorsInSection; + + // Skip segments that are completely behind the desired minPosition + if (desc.id == minPosition.segmentId && syncSegment.endPosition < minPosition.position) + continue; + + statusTracker.errorContext = String.format("Next section at %d in %s", syncSegment.fileStartPosition, desc.fileName()); + + readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition, statusTracker, desc); + if (!statusTracker.shouldContinue()) + break; + } + } + // Unfortunately AbstractIterator cannot throw a checked exception, so we check to see if a RuntimeException + // is wrapping an IOException. + catch (RuntimeException re) + { + if (re.getCause() instanceof IOException) + throw (IOException) re.getCause(); + throw re; + } + logger.debug("Finished reading {}", file); + } + } + + /** + * Any segment with id >= minPosition.segmentId is a candidate for read. + */ + private boolean shouldSkipSegmentId(File file, CommitLogDescriptor desc, CommitLogPosition minPosition) + { + logger.debug("Reading {} (CL version {}, messaging version {}, compression {})", + file.getPath(), + desc.version, + desc.getMessagingVersion(), + desc.compression); + + if (minPosition.segmentId > desc.id) + { + logger.trace("Skipping read of fully-flushed {}", file); + return true; + } + return false; + } + + /** + * Reads a section of a file containing mutations + * + * @param handler Handler that will take action based on deserialized Mutations + * @param reader FileDataInput / logical buffer containing commitlog mutations + * @param minPosition CommitLogPosition indicating when we should start actively replaying mutations + * @param end logical numeric end of the segment being read + * @param statusTracker ReadStatusTracker with current state of mutation count, error state, etc + * @param desc Descriptor for CommitLog serialization + */ + private void readSection(CommitLogReadHandler handler, + FileDataInput reader, + CommitLogPosition minPosition, + int end, + ReadStatusTracker statusTracker, + CommitLogDescriptor desc) throws IOException + { + // seek rather than deserializing mutation-by-mutation to reach the desired minPosition in this SyncSegment + if (desc.id == minPosition.segmentId && reader.getFilePointer() < minPosition.position) + reader.seek(minPosition.position); + + while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF()) + { + long mutationStart = reader.getFilePointer(); + if (logger.isTraceEnabled()) + logger.trace("Reading mutation at {}", mutationStart); + + long claimedCRC32; + int serializedSize; + try + { + // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end + // of a segment, which happens naturally due to the 0 padding of the empty segment on creation. + // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes + // from the end of the file, which means that we'll be unable to read an a full int and instead + // read an EOF here + if(end - reader.getFilePointer() < 4) + { + logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing"); + statusTracker.requestTermination(); + return; + } + + // any of the reads may hit EOF + serializedSize = reader.readInt(); + if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER) + { + logger.trace("Encountered end of segment marker at {}", reader.getFilePointer()); + statusTracker.requestTermination(); + return; + } + + // Mutation must be at LEAST 10 bytes: + // 3 for a non-empty Keyspace + // 3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength) + // 4 bytes for column count. + // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 + if (serializedSize < 10) + { + if (handler.shouldSkipSegmentOnError(new CommitLogReadException( + String.format("Invalid mutation size %d at %d in %s", serializedSize, mutationStart, statusTracker.errorContext), + CommitLogReadErrorReason.MUTATION_ERROR, + statusTracker.tolerateErrorsInSection))) + { + statusTracker.requestTermination(); + } + return; + } + + long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader, desc.version); + checksum.reset(); + CommitLogFormat.updateChecksum(checksum, serializedSize, desc.version); + + if (checksum.getValue() != claimedSizeChecksum) + { + if (handler.shouldSkipSegmentOnError(new CommitLogReadException( + String.format("Mutation size checksum failure at %d in %s", mutationStart, statusTracker.errorContext), + CommitLogReadErrorReason.MUTATION_ERROR, + statusTracker.tolerateErrorsInSection))) + { + statusTracker.requestTermination(); + } + return; + } + + if (serializedSize > buffer.length) + buffer = new byte[(int) (1.2 * serializedSize)]; + reader.readFully(buffer, 0, serializedSize); + + claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader, desc.version); + } + catch (EOFException eof) + { + if (handler.shouldSkipSegmentOnError(new CommitLogReadException( + String.format("Unexpected end of segment at %d in %s", mutationStart, statusTracker.errorContext), + CommitLogReadErrorReason.EOF, + statusTracker.tolerateErrorsInSection))) + { + statusTracker.requestTermination(); + } + return; + } + + checksum.update(buffer, 0, serializedSize); + if (claimedCRC32 != checksum.getValue()) + { + if (handler.shouldSkipSegmentOnError(new CommitLogReadException( + String.format("Mutation checksum failure at %d in %s", mutationStart, statusTracker.errorContext), + CommitLogReadErrorReason.MUTATION_ERROR, + statusTracker.tolerateErrorsInSection))) + { + statusTracker.requestTermination(); + } + continue; + } + + long mutationPosition = reader.getFilePointer(); + readMutation(handler, buffer, serializedSize, minPosition, (int)mutationPosition, desc); + + // Only count this as a processed mutation if it is after our min as we suppress reading of mutations that + // are before this mark. + if (mutationPosition >= minPosition.position) + statusTracker.addProcessedMutation(); + } + } + + /** + * Deserializes and passes a Mutation to the ICommitLogReadHandler requested + * + * @param handler Handler that will take action based on deserialized Mutations + * @param inputBuffer raw byte array w/Mutation data + * @param size deserialized size of mutation + * @param minPosition We need to suppress replay of mutations that are before the required minPosition + * @param entryLocation filePointer offset of mutation within CommitLogSegment + * @param desc CommitLogDescriptor being worked on + */ + @VisibleForTesting + protected void readMutation(CommitLogReadHandler handler, + byte[] inputBuffer, + int size, + CommitLogPosition minPosition, + final int entryLocation, + final CommitLogDescriptor desc) throws IOException + { + // For now, we need to go through the motions of deserializing the mutation to determine its size and move + // the file pointer forward accordingly, even if we're behind the requested minPosition within this SyncSegment. + boolean shouldReplay = entryLocation > minPosition.position; + + final Mutation mutation; + try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size)) + { + mutation = Mutation.serializer.deserialize(bufIn, + desc.getMessagingVersion(), + SerializationHelper.Flag.LOCAL); + // doublecheck that what we read is still] valid for the current schema + for (PartitionUpdate upd : mutation.getPartitionUpdates()) + upd.validate(); + } + catch (UnknownColumnFamilyException ex) + { + if (ex.cfId == null) + return; + AtomicInteger i = invalidMutations.get(ex.cfId); + if (i == null) + { + i = new AtomicInteger(1); + invalidMutations.put(ex.cfId, i); + } + else + i.incrementAndGet(); + return; + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + File f = File.createTempFile("mutation", "dat"); + + try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f))) + { + out.write(inputBuffer, 0, size); + } + + // Checksum passed so this error can't be permissible. + handler.handleUnrecoverableError(new CommitLogReadException( + String.format( + "Unexpected error deserializing mutation; saved to %s. " + + "This may be caused by replaying a mutation against a table with the same name but incompatible schema. " + + "Exception follows: %s", f.getAbsolutePath(), t), + CommitLogReadErrorReason.MUTATION_ERROR, + false)); + return; + } + + if (logger.isTraceEnabled()) + logger.trace("Read mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), + "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}"); + + if (shouldReplay) + handler.handleMutation(mutation, size, entryLocation, desc); + } + + /** + * Helper methods to deal with changing formats of internals of the CommitLog without polluting deserialization code. + */ + private static class CommitLogFormat + { + public static long calculateClaimedChecksum(FileDataInput input, int commitLogVersion) throws IOException + { + switch (commitLogVersion) + { + case CommitLogDescriptor.VERSION_12: + case CommitLogDescriptor.VERSION_20: + return input.readLong(); + // Changed format in 2.1 + default: + return input.readInt() & 0xffffffffL; + } + } + + public static void updateChecksum(CRC32 checksum, int serializedSize, int commitLogVersion) + { + switch (commitLogVersion) + { + case CommitLogDescriptor.VERSION_12: + checksum.update(serializedSize); + break; + // Changed format in 2.0 + default: + updateChecksumInt(checksum, serializedSize); + break; + } + } + + public static long calculateClaimedCRC32(FileDataInput input, int commitLogVersion) throws IOException + { + switch (commitLogVersion) + { + case CommitLogDescriptor.VERSION_12: + case CommitLogDescriptor.VERSION_20: + return input.readLong(); + // Changed format in 2.1 + default: + return input.readInt() & 0xffffffffL; + } + } + } + + private static class ReadStatusTracker + { + private int mutationsLeft; + public String errorContext = ""; + public boolean tolerateErrorsInSection; + private boolean error; + + public ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection) + { + this.mutationsLeft = mutationLimit; + this.tolerateErrorsInSection = tolerateErrorsInSection; + } + + public void addProcessedMutation() + { + if (mutationsLeft == ALL_MUTATIONS) + return; + --mutationsLeft; + } + + public boolean shouldContinue() + { + return !error && (mutationsLeft != 0 || mutationsLeft == ALL_MUTATIONS); + } + + public void requestTermination() + { + error = true; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 4d2971f,4fd263c..ea62fd8 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@@ -18,23 -18,26 +18,20 @@@ */ package org.apache.cassandra.db.commitlog; -import java.io.DataOutputStream; -import java.io.EOFException; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.*; - import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -import java.util.zip.CRC32; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; - import com.google.common.base.Throwables; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; -import com.google.common.collect.Ordering; - +import com.google.common.collect.*; - import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; +import org.cliffc.high_scale_lib.NonBlockingHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 8670c64,236a1b1..a618d0b --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@@ -337,20 -315,8 +337,20 @@@ public abstract class CommitLogSegmen syncComplete.signalAll(); } + /** + * Create a sync marker to delineate sections of the commit log, typically created on each sync of the file. + * The sync marker consists of a file pointer to where the next sync marker should be (effectively declaring the length + * of this section), as well as a CRC value. + * + * @param buffer buffer in which to write out the sync marker. + * @param offset Offset into the {@code buffer} at which to write the sync marker. + * @param filePos The current position in the target file where the sync marker will be written (most likely different from the buffer position). + * @param nextMarker The file position of where the next sync marker should be. + */ - protected void writeSyncMarker(ByteBuffer buffer, int offset, int filePos, int nextMarker) + protected static void writeSyncMarker(long id, ByteBuffer buffer, int offset, int filePos, int nextMarker) { + if (filePos > nextMarker) + throw new IllegalArgumentException(String.format("commit log sync marker's current file position %d is greater than next file position %d", filePos, nextMarker)); CRC32 crc = new CRC32(); updateChecksumInt(crc, (int) (id & 0xFFFFFFFFL)); updateChecksumInt(crc, (int) (id >>> 32)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java index 288b766,c00ce18..967db15 --- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java @@@ -77,8 -134,8 +77,8 @@@ public class CompressedSegment extends // Only one thread can be here at a given time. // Protected by synchronization on CommitLogSegment.sync(). - writeSyncMarker(compressedBuffer, 0, (int) channel.position(), (int) channel.position() + compressedBuffer.remaining()); + writeSyncMarker(id, compressedBuffer, 0, (int) channel.position(), (int) channel.position() + compressedBuffer.remaining()); - commitLog.allocator.addSize(compressedBuffer.limit()); + manager.addSize(compressedBuffer.limit()); channel.write(compressedBuffer); assert channel.position() - lastWrittenPos == compressedBuffer.limit(); lastWrittenPos = channel.position(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java index 4ca1ede,0000000..87825ab mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java @@@ -1,159 -1,0 +1,159 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import javax.crypto.Cipher; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.security.EncryptionUtils; +import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.utils.Hex; +import org.apache.cassandra.utils.SyncUtil; + +import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE; + +/** + * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into + * the encryption algorithms. + * + * The format of the encrypted commit log is as follows: + * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)}) + * - a series of 'sync segments' that are written every time the commit log is sync()'ed - * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)} ++ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)} + * -- total plain text length for this section + * -- a series of encrypted data blocks, each of which contains: + * --- the length of the encrypted block (cipher text) + * --- the length of the unencrypted data (compressed text) + * --- the encrypted block, which contains: + * ---- the length of the plain text (raw) data + * ---- block of compressed data + * + * Notes: + * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding + * to the output buffer, and we need to ignore that padding when processing. + */ +public class EncryptedSegment extends FileDirectSegment +{ + private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class); + + private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4; + + private final EncryptionContext encryptionContext; + private final Cipher cipher; + + public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager) + { + super(commitLog, manager); + this.encryptionContext = commitLog.configuration.getEncryptionContext(); + + try + { + cipher = encryptionContext.getEncryptor(); + } + catch (IOException e) + { + throw new FSWriteError(e, logFile); + } + logger.debug("created a new encrypted commit log segment: {}", logFile); + // Keep reusable buffers on-heap regardless of compression preference so we avoid copy off/on repeatedly during decryption + manager.getBufferPool().setPreferredReusableBufferType(BufferType.ON_HEAP); + } + + protected Map<String, String> additionalHeaderParameters() + { + Map<String, String> map = encryptionContext.toHeaderParameters(); + map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV())); + return map; + } + + ByteBuffer createBuffer(CommitLog commitLog) + { + // Note: we want to keep the compression buffers on-heap as we need those bytes for encryption, + // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs + return manager.getBufferPool().createBuffer(BufferType.ON_HEAP); + } + + void write(int startMarker, int nextMarker) + { + int contentStart = startMarker + SYNC_MARKER_SIZE; + final int length = nextMarker - contentStart; + // The length may be 0 when the segment is being closed. + assert length > 0 || length == 0 && !isStillAllocating(); + + final ICompressor compressor = encryptionContext.getCompressor(); + final int blockSize = encryptionContext.getChunkLength(); + try + { + ByteBuffer inputBuffer = buffer.duplicate(); + inputBuffer.limit(contentStart + length).position(contentStart); + ByteBuffer buffer = manager.getBufferPool().getThreadLocalReusableBuffer(DatabaseDescriptor.getCommitLogSegmentSize()); + + // save space for the sync marker at the beginning of this section + final long syncMarkerPosition = lastWrittenPos; + channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE); + + // loop over the segment data in encryption buffer sized chunks + while (contentStart < nextMarker) + { + int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart; + ByteBuffer slice = inputBuffer.duplicate(); + slice.limit(contentStart + nextBlockSize).position(contentStart); + + buffer = EncryptionUtils.compress(slice, buffer, true, compressor); + + // reuse the same buffer for the input and output of the encryption operation + buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher); + + contentStart += nextBlockSize; + manager.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE); + } + + lastWrittenPos = channel.position(); + + // rewind to the beginning of the section and write out the sync marker + buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE); - writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos); ++ writeSyncMarker(id, buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos); + buffer.putInt(SYNC_MARKER_SIZE, length); + buffer.rewind(); + manager.addSize(buffer.limit()); + + channel.position(syncMarkerPosition); + channel.write(buffer); + + SyncUtil.force(channel, true); + } + catch (Exception e) + { + throw new FSWriteError(e, getPath()); + } + } + + public long onDiskSize() + { + return lastWrittenPos; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java index 250b3e4,3a16d91..bbf9ad2 --- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java @@@ -76,10 -77,9 +76,10 @@@ public class MemoryMappedSegment extend // write previous sync marker to point to next sync marker // we don't chain the crcs here to ensure this method is idempotent if it fails - writeSyncMarker(buffer, startMarker, startMarker, nextMarker); + writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker); - try { + try + { SyncUtil.force((MappedByteBuffer) buffer); } catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index aab55a5,9e9ee53..267813e --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@@ -28,9 -30,11 +28,10 @@@ import java.util.stream.Collectors import java.util.zip.CRC32; import java.util.zip.Checksum; +import com.google.common.collect.Iterables; + +import org.junit.*; + import com.google.common.io.Files; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@@ -155,12 -129,64 +156,64 @@@ public class CommitLogTes } @Test - public void testRecoveryWithEmptyFinalLog() throws Exception + public void testRecoveryWithFinalEmptyLog() throws Exception { // Even though it's empty, it's the last commitlog segment, so allowTruncation=true should allow it to pass - CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.current_version) }); + CommitLog.instance.recoverFiles(new File[]{tmpFile(CommitLogDescriptor.current_version)}); } + /** + * Since commit log segments can be allocated before they're needed, the commit log file with the highest + * id isn't neccesarily the last log that we wrote to. We should remove header only logs on recover so we + * can tolerate truncated logs + */ + @Test + public void testHeaderOnlyFileFiltering() throws Exception + { + File directory = Files.createTempDir(); + - CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null); - CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 2, null); ++ CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, DatabaseDescriptor.getEncryptionContext()); ++ CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 2, null, DatabaseDescriptor.getEncryptionContext()); + + ByteBuffer buffer; + + // this has a header and malformed data + File file1 = new File(directory, desc1.fileName()); + buffer = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buffer, desc1); + int pos = buffer.position(); + CommitLogSegment.writeSyncMarker(desc1.id, buffer, buffer.position(), buffer.position(), buffer.position() + 128); + buffer.position(pos + 8); + buffer.putInt(5); + buffer.putInt(6); + + try (OutputStream lout = new FileOutputStream(file1)) + { + lout.write(buffer.array()); + } + + // this has only a header + File file2 = new File(directory, desc2.fileName()); + buffer = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buffer, desc2); + try (OutputStream lout = new FileOutputStream(file2)) + { + lout.write(buffer.array()); + } + + // one corrupt file and one header only file should be ok + runExpecting(() -> { - CommitLog.instance.recover(file1, file2); ++ CommitLog.instance.recoverFiles(file1, file2); + return null; + }, null); + + // 2 corrupt files and one header only file should fail + runExpecting(() -> { - CommitLog.instance.recover(file1, file1, file2); ++ CommitLog.instance.recoverFiles(file1, file1, file2); + return null; + }, CommitLogReplayException.class); + } + @Test public void testRecoveryWithEmptyLog20() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org