Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2399d4d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2399d4d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2399d4d Branch: refs/heads/trunk Commit: a2399d4d309ac6b60a150ea20af8dc6f006d51ff Parents: 2c111d1 44f79bf Author: Jeff Jirsa <j...@jeffjirsa.net> Authored: Sun Mar 12 21:56:11 2017 -0700 Committer: Jeff Jirsa <j...@jeffjirsa.net> Committed: Sun Mar 12 21:57:25 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/commitlog/CommitLogReader.java | 12 ++++++++++++ .../apache/cassandra/db/commitlog/CommitLogTest.java | 15 ++++++++++++++- 3 files changed, 27 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 302a028,140c860..ab28dd4 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -33,140 -43,6 +33,141 @@@ Merged from 3.0 live rows in sstabledump (CASSANDRA-13177) * Provide user workaround when system_schema.columns does not contain entries for a table that's in system_schema.tables (CASSANDRA-13180) +Merged from 2.2: ++ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282) + * Fix queries updating multiple time the same list (CASSANDRA-13130) + * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053) + * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202) + * Fix failing COPY TO STDOUT (CASSANDRA-12497) + * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222) + * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018) + * Fix negative mean latency metric (CASSANDRA-12876) + * Use only one file pointer when creating commitlog segments (CASSANDRA-12539) +Merged from 2.1: + * Remove unused repositories (CASSANDRA-13278) + * Log stacktrace of uncaught exceptions (CASSANDRA-13108) + * Use portable stderr for java error in startup (CASSANDRA-13211) + * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204) + * Coalescing strategy can enter infinite loop (CASSANDRA-13159) + + +3.10 + * Fix secondary index queries regression (CASSANDRA-13013) + * Add duration type to the protocol V5 (CASSANDRA-12850) + * Fix duration type validation (CASSANDRA-13143) + * Fix flaky GcCompactionTest (CASSANDRA-12664) + * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058) + * Fixed query monitoring for range queries (CASSANDRA-13050) + * Remove outboundBindAny configuration property (CASSANDRA-12673) + * Use correct bounds for all-data range when filtering (CASSANDRA-12666) + * Remove timing window in test case (CASSANDRA-12875) + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945) + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919) + * Fix validation of non-frozen UDT cells (CASSANDRA-12916) + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903) + * Fix Murmur3PartitionerTest (CASSANDRA-12858) + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897) + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283) + * Fix cassandra-stress truncate option (CASSANDRA-12695) + * Fix crossNode value when receiving messages (CASSANDRA-12791) + * Don't load MX4J beans twice (CASSANDRA-12869) + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838) + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836) + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845) + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454) + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777) + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419) + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803) + * Use different build directories for Eclipse and Ant (CASSANDRA-12466) + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815) + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812) + * Upgrade commons-codec to 1.9 (CASSANDRA-12790) + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550) + * Add duration data type (CASSANDRA-11873) + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784) + * Improve sum aggregate functions (CASSANDRA-12417) + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761) + * cqlsh fails to format collections when using aliases (CASSANDRA-11534) + * Check for hash conflicts in prepared statements (CASSANDRA-12733) + * Exit query parsing upon first error (CASSANDRA-12598) + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729) + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450) + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199) + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461) + * Add hint delivery metrics (CASSANDRA-12693) + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731) + * ColumnIndex does not reuse buffer (CASSANDRA-12502) + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697) + * Upgrade metrics-reporter dependencies (CASSANDRA-12089) + * Tune compaction thread count via nodetool (CASSANDRA-12248) + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232) + * Include repair session IDs in repair start message (CASSANDRA-12532) + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039) + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667) + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318) + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647) + * Fix cassandra-stress graphing (CASSANDRA-12237) + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031) + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585) + * Add JMH benchmarks.jar (CASSANDRA-12586) + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567) + * Add keep-alive to streaming (CASSANDRA-11841) + * Tracing payload is passed through newSession(..) (CASSANDRA-11706) + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261) + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486) + * Retry all internode messages once after a connection is + closed and reopened (CASSANDRA-12192) + * Add support to rebuild from targeted replica (CASSANDRA-9875) + * Add sequence distribution type to cassandra stress (CASSANDRA-12490) + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154) + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474) + * Extend read/write failure messages with a map of replica addresses + to error codes in the v5 native protocol (CASSANDRA-12311) + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374) + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550) + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378) + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223) + * Added slow query log (CASSANDRA-12403) + * Count full coordinated request against timeout (CASSANDRA-12256) + * Allow TTL with null value on insert and update (CASSANDRA-12216) + * Make decommission operation resumable (CASSANDRA-12008) + * Add support to one-way targeted repair (CASSANDRA-9876) + * Remove clientutil jar (CASSANDRA-11635) + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717) + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358) + * Cassandra stress should dump all setting on startup (CASSANDRA-11914) + * Make it possible to compact a given token range (CASSANDRA-10643) + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179) + * Collect metrics on queries by consistency level (CASSANDRA-7384) + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707) + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228) + * Upgrade to OHC 0.4.4 (CASSANDRA-12133) + * Add version command to cassandra-stress (CASSANDRA-12258) + * Create compaction-stress tool (CASSANDRA-11844) + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019) + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142) + * Support filtering on non-PRIMARY KEY columns in the CREATE + MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368) + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004) + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174) + * Faster write path (CASSANDRA-12269) + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424) + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035) + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635) + * Prepend snapshot name with "truncated" or "dropped" when a snapshot + is taken before truncating or dropping a table (CASSANDRA-12178) + * Optimize RestrictionSet (CASSANDRA-12153) + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150) + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613) + * Create a system table to expose prepared statements (CASSANDRA-8831) + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970) + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580) + * Add supplied username to authentication error messages (CASSANDRA-12076) + * Remove pre-startup check for open JMX port (CASSANDRA-12074) + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738) + * Restore resumable hints delivery (CASSANDRA-11960) + * Properly report LWT contention (CASSANDRA-12626) +Merged from 3.0: * Dump threads when unit tests time out (CASSANDRA-13117) * Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925) * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java index e6e2e1a,0000000..d1cb8d6 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java @@@ -1,502 -1,0 +1,514 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.CRC32; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.UnknownColumnFamilyException; +import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason; +import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.RebufferingInputStream; +import org.apache.cassandra.utils.JVMStabilityInspector; + +import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; + +public class CommitLogReader +{ + private static final Logger logger = LoggerFactory.getLogger(CommitLogReader.class); + + private static final int LEGACY_END_OF_SEGMENT_MARKER = 0; + + @VisibleForTesting + public static final int ALL_MUTATIONS = -1; + private final CRC32 checksum; + private final Map<UUID, AtomicInteger> invalidMutations; + + private byte[] buffer; + + public CommitLogReader() + { + checksum = new CRC32(); + invalidMutations = new HashMap<>(); + buffer = new byte[4096]; + } + + public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations() + { + return invalidMutations.entrySet(); + } + + /** + * Reads all passed in files with no minimum, no start, and no mutation limit. + */ + public void readAllFiles(CommitLogReadHandler handler, File[] files) throws IOException + { + readAllFiles(handler, files, CommitLogPosition.NONE); + } + + /** + * Reads all passed in files with minPosition, no start, and no mutation limit. + */ + public void readAllFiles(CommitLogReadHandler handler, File[] files, CommitLogPosition minPosition) throws IOException + { + for (int i = 0; i < files.length; i++) + readCommitLogSegment(handler, files[i], minPosition, ALL_MUTATIONS, i + 1 == files.length); + } + + /** + * Reads passed in file fully + */ + public void readCommitLogSegment(CommitLogReadHandler handler, File file, boolean tolerateTruncation) throws IOException + { + readCommitLogSegment(handler, file, CommitLogPosition.NONE, ALL_MUTATIONS, tolerateTruncation); + } + + /** + * Reads passed in file fully, up to mutationLimit count + */ + @VisibleForTesting + public void readCommitLogSegment(CommitLogReadHandler handler, File file, int mutationLimit, boolean tolerateTruncation) throws IOException + { + readCommitLogSegment(handler, file, CommitLogPosition.NONE, mutationLimit, tolerateTruncation); + } + + /** + * Reads mutations from file, handing them off to handler + * @param handler Handler that will take action based on deserialized Mutations + * @param file CommitLogSegment file to read + * @param minPosition Optional minimum CommitLogPosition - all segments with id > or matching w/greater position will be read + * @param mutationLimit Optional limit on # of mutations to replay. Local ALL_MUTATIONS serves as marker to play all. + * @param tolerateTruncation Whether or not we should allow truncation of this file or throw if EOF found + * + * @throws IOException + */ + public void readCommitLogSegment(CommitLogReadHandler handler, + File file, + CommitLogPosition minPosition, + int mutationLimit, + boolean tolerateTruncation) throws IOException + { + // just transform from the file name (no reading of headers) to determine version + CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); + + try(RandomAccessReader reader = RandomAccessReader.open(file)) + { + if (desc.version < CommitLogDescriptor.VERSION_21) + { + if (!shouldSkipSegmentId(file, desc, minPosition)) + { + if (minPosition.segmentId == desc.id) + reader.seek(minPosition.position); + ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation); + statusTracker.errorContext = desc.fileName(); + readSection(handler, reader, minPosition, (int) reader.length(), statusTracker, desc); + } + return; + } + + final long segmentIdFromFilename = desc.id; + try + { + // The following call can either throw or legitimately return null. For either case, we need to check + // desc outside this block and set it to null in the exception case. + desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext()); + } + catch (Exception e) + { + desc = null; + } + if (desc == null) + { + // don't care about whether or not the handler thinks we can continue. We can't w/out descriptor. + handler.handleUnrecoverableError(new CommitLogReadException( + String.format("Could not read commit log descriptor in file %s", file), + CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR, + false)); + return; + } + + if (segmentIdFromFilename != desc.id) + { + if (handler.shouldSkipSegmentOnError(new CommitLogReadException(String.format( + "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentIdFromFilename, desc.id, file), + CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR, + false))) + { + return; + } + } + + if (shouldSkipSegmentId(file, desc, minPosition)) + return; + + CommitLogSegmentReader segmentReader; + try + { + segmentReader = new CommitLogSegmentReader(handler, desc, reader, tolerateTruncation); + } + catch(Exception e) + { + handler.handleUnrecoverableError(new CommitLogReadException( + String.format("Unable to create segment reader for commit log file: %s", e), + CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR, + tolerateTruncation)); + return; + } + + try + { + ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation); + for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader) + { + // Only tolerate truncation if we allow in both global and segment + statusTracker.tolerateErrorsInSection = tolerateTruncation & syncSegment.toleratesErrorsInSection; + + // Skip segments that are completely behind the desired minPosition + if (desc.id == minPosition.segmentId && syncSegment.endPosition < minPosition.position) + continue; + + statusTracker.errorContext = String.format("Next section at %d in %s", syncSegment.fileStartPosition, desc.fileName()); + + readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition, statusTracker, desc); + if (!statusTracker.shouldContinue()) + break; + } + } + // Unfortunately AbstractIterator cannot throw a checked exception, so we check to see if a RuntimeException + // is wrapping an IOException. + catch (RuntimeException re) + { + if (re.getCause() instanceof IOException) + throw (IOException) re.getCause(); + throw re; + } + logger.debug("Finished reading {}", file); + } + } + + /** + * Any segment with id >= minPosition.segmentId is a candidate for read. + */ + private boolean shouldSkipSegmentId(File file, CommitLogDescriptor desc, CommitLogPosition minPosition) + { + logger.debug("Reading {} (CL version {}, messaging version {}, compression {})", + file.getPath(), + desc.version, + desc.getMessagingVersion(), + desc.compression); + + if (minPosition.segmentId > desc.id) + { + logger.trace("Skipping read of fully-flushed {}", file); + return true; + } + return false; + } + + /** + * Reads a section of a file containing mutations + * + * @param handler Handler that will take action based on deserialized Mutations + * @param reader FileDataInput / logical buffer containing commitlog mutations + * @param minPosition CommitLogPosition indicating when we should start actively replaying mutations + * @param end logical numeric end of the segment being read + * @param statusTracker ReadStatusTracker with current state of mutation count, error state, etc + * @param desc Descriptor for CommitLog serialization + */ + private void readSection(CommitLogReadHandler handler, + FileDataInput reader, + CommitLogPosition minPosition, + int end, + ReadStatusTracker statusTracker, + CommitLogDescriptor desc) throws IOException + { + // seek rather than deserializing mutation-by-mutation to reach the desired minPosition in this SyncSegment + if (desc.id == minPosition.segmentId && reader.getFilePointer() < minPosition.position) + reader.seek(minPosition.position); + + while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF()) + { + long mutationStart = reader.getFilePointer(); + if (logger.isTraceEnabled()) + logger.trace("Reading mutation at {}", mutationStart); + + long claimedCRC32; + int serializedSize; + try + { ++ // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end ++ // of a segment, which happens naturally due to the 0 padding of the empty segment on creation. ++ // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes ++ // from the end of the file, which means that we'll be unable to read an a full int and instead ++ // read an EOF here ++ if(end - reader.getFilePointer() < 4) ++ { ++ logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing"); ++ statusTracker.requestTermination(); ++ return; ++ } ++ + // any of the reads may hit EOF + serializedSize = reader.readInt(); + if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER) + { + logger.trace("Encountered end of segment marker at {}", reader.getFilePointer()); + statusTracker.requestTermination(); + return; + } + + // Mutation must be at LEAST 10 bytes: + // 3 for a non-empty Keyspace + // 3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength) + // 4 bytes for column count. + // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 + if (serializedSize < 10) + { + if (handler.shouldSkipSegmentOnError(new CommitLogReadException( + String.format("Invalid mutation size %d at %d in %s", serializedSize, mutationStart, statusTracker.errorContext), + CommitLogReadErrorReason.MUTATION_ERROR, + statusTracker.tolerateErrorsInSection))) + { + statusTracker.requestTermination(); + } + return; + } + + long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader, desc.version); + checksum.reset(); + CommitLogFormat.updateChecksum(checksum, serializedSize, desc.version); + + if (checksum.getValue() != claimedSizeChecksum) + { + if (handler.shouldSkipSegmentOnError(new CommitLogReadException( + String.format("Mutation size checksum failure at %d in %s", mutationStart, statusTracker.errorContext), + CommitLogReadErrorReason.MUTATION_ERROR, + statusTracker.tolerateErrorsInSection))) + { + statusTracker.requestTermination(); + } + return; + } + + if (serializedSize > buffer.length) + buffer = new byte[(int) (1.2 * serializedSize)]; + reader.readFully(buffer, 0, serializedSize); + + claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader, desc.version); + } + catch (EOFException eof) + { + if (handler.shouldSkipSegmentOnError(new CommitLogReadException( + String.format("Unexpected end of segment at %d in %s", mutationStart, statusTracker.errorContext), + CommitLogReadErrorReason.EOF, + statusTracker.tolerateErrorsInSection))) + { + statusTracker.requestTermination(); + } + return; + } + + checksum.update(buffer, 0, serializedSize); + if (claimedCRC32 != checksum.getValue()) + { + if (handler.shouldSkipSegmentOnError(new CommitLogReadException( + String.format("Mutation checksum failure at %d in %s", mutationStart, statusTracker.errorContext), + CommitLogReadErrorReason.MUTATION_ERROR, + statusTracker.tolerateErrorsInSection))) + { + statusTracker.requestTermination(); + } + continue; + } + + long mutationPosition = reader.getFilePointer(); + readMutation(handler, buffer, serializedSize, minPosition, (int)mutationPosition, desc); + + // Only count this as a processed mutation if it is after our min as we suppress reading of mutations that + // are before this mark. + if (mutationPosition >= minPosition.position) + statusTracker.addProcessedMutation(); + } + } + + /** + * Deserializes and passes a Mutation to the ICommitLogReadHandler requested + * + * @param handler Handler that will take action based on deserialized Mutations + * @param inputBuffer raw byte array w/Mutation data + * @param size deserialized size of mutation + * @param minPosition We need to suppress replay of mutations that are before the required minPosition + * @param entryLocation filePointer offset of mutation within CommitLogSegment + * @param desc CommitLogDescriptor being worked on + */ + @VisibleForTesting + protected void readMutation(CommitLogReadHandler handler, + byte[] inputBuffer, + int size, + CommitLogPosition minPosition, + final int entryLocation, + final CommitLogDescriptor desc) throws IOException + { + // For now, we need to go through the motions of deserializing the mutation to determine its size and move + // the file pointer forward accordingly, even if we're behind the requested minPosition within this SyncSegment. + boolean shouldReplay = entryLocation > minPosition.position; + + final Mutation mutation; + try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size)) + { + mutation = Mutation.serializer.deserialize(bufIn, + desc.getMessagingVersion(), + SerializationHelper.Flag.LOCAL); + // doublecheck that what we read is still] valid for the current schema + for (PartitionUpdate upd : mutation.getPartitionUpdates()) + upd.validate(); + } + catch (UnknownColumnFamilyException ex) + { + if (ex.cfId == null) + return; + AtomicInteger i = invalidMutations.get(ex.cfId); + if (i == null) + { + i = new AtomicInteger(1); + invalidMutations.put(ex.cfId, i); + } + else + i.incrementAndGet(); + return; + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + File f = File.createTempFile("mutation", "dat"); + + try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f))) + { + out.write(inputBuffer, 0, size); + } + + // Checksum passed so this error can't be permissible. + handler.handleUnrecoverableError(new CommitLogReadException( + String.format( + "Unexpected error deserializing mutation; saved to %s. " + + "This may be caused by replaying a mutation against a table with the same name but incompatible schema. " + + "Exception follows: %s", f.getAbsolutePath(), t), + CommitLogReadErrorReason.MUTATION_ERROR, + false)); + return; + } + + if (logger.isTraceEnabled()) + logger.trace("Read mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), + "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}"); + + if (shouldReplay) + handler.handleMutation(mutation, size, entryLocation, desc); + } + + /** + * Helper methods to deal with changing formats of internals of the CommitLog without polluting deserialization code. + */ + private static class CommitLogFormat + { + public static long calculateClaimedChecksum(FileDataInput input, int commitLogVersion) throws IOException + { + switch (commitLogVersion) + { + case CommitLogDescriptor.VERSION_12: + case CommitLogDescriptor.VERSION_20: + return input.readLong(); + // Changed format in 2.1 + default: + return input.readInt() & 0xffffffffL; + } + } + + public static void updateChecksum(CRC32 checksum, int serializedSize, int commitLogVersion) + { + switch (commitLogVersion) + { + case CommitLogDescriptor.VERSION_12: + checksum.update(serializedSize); + break; + // Changed format in 2.0 + default: + updateChecksumInt(checksum, serializedSize); + break; + } + } + + public static long calculateClaimedCRC32(FileDataInput input, int commitLogVersion) throws IOException + { + switch (commitLogVersion) + { + case CommitLogDescriptor.VERSION_12: + case CommitLogDescriptor.VERSION_20: + return input.readLong(); + // Changed format in 2.1 + default: + return input.readInt() & 0xffffffffL; + } + } + } + + private static class ReadStatusTracker + { + private int mutationsLeft; + public String errorContext = ""; + public boolean tolerateErrorsInSection; + private boolean error; + + public ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection) + { + this.mutationsLeft = mutationLimit; + this.tolerateErrorsInSection = tolerateErrorsInSection; + } + + public void addProcessedMutation() + { + if (mutationsLeft == ALL_MUTATIONS) + return; + --mutationsLeft; + } + + public boolean shouldContinue() + { + return !error && (mutationsLeft != 0 || mutationsLeft == ALL_MUTATIONS); + } + + public void requestTermination() + { + error = true; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index 5476d03,90dc258..4000fbf --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@@ -171,10 -143,23 +171,23 @@@ public class CommitLogTes } @Test + public void testRecoveryWithShortPadding() throws Exception + { - // If we have 0-3 bytes remaining, commitlog replayer - // should pass, because there's insufficient room - // left in the segment for the legacy size marker. - testRecovery(new byte[1], null); - testRecovery(new byte[2], null); - testRecovery(new byte[3], null); ++ // If we have 0-3 bytes remaining, commitlog replayer ++ // should pass, because there's insufficient room ++ // left in the segment for the legacy size marker. ++ testRecovery(new byte[1], null); ++ testRecovery(new byte[2], null); ++ testRecovery(new byte[3], null); + } + + @Test public void testRecoveryWithShortSize() throws Exception { + byte[] data = new byte[5]; + data[3] = 1; // Not a legacy marker, give it a fake (short) size runExpecting(() -> { - testRecovery(new byte[2], CommitLogDescriptor.VERSION_20); + testRecovery(data, CommitLogDescriptor.VERSION_20); return null; }, CommitLogReplayException.class); }