Make commitlog archive+restore more robust patch by bes; reviewed by jbellis for CASSANDRA-6974
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/134e0226 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/134e0226 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/134e0226 Branch: refs/heads/cassandra-2.1 Commit: 134e0226e42d977e8e73477b1ff24d51e64b4436 Parents: a89fd4d Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu May 8 15:56:22 2014 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu May 8 16:00:09 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../cassandra/db/commitlog/CommitLog.java | 4 +- .../db/commitlog/CommitLogArchiver.java | 30 +++++++-- .../db/commitlog/CommitLogDescriptor.java | 64 ++++++++++++++++++++ .../db/commitlog/CommitLogReplayer.java | 37 +++++++---- .../db/commitlog/CommitLogSegment.java | 57 +++++++++++++---- .../db/commitlog/CommitLogSegmentManager.java | 7 +-- .../apache/cassandra/utils/PureJavaCrc32.java | 17 +++++- 8 files changed, 178 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6d80937..1ebc050 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.0-rc1 + * Make commitlog archive+restore more robust (CASSANDRA-6974) * Fix marking commitlogsegments clean (CASSANDRA-6959) * Add snapshot "manifest" describing files included (CASSANDRA-6326) * Parallel streaming for sstableloader (CASSANDRA-3668) @@ -24,6 +25,7 @@ Merged from 1.2: * remove duplicate query for local tokens (CASSANDRA-7182) * exit CQLSH with error status code if script fails (CASSANDRA-6344) + 2.1.0-beta2 * Increase default CL space to 8GB (CASSANDRA-7031) * Add range tombstones to read repair digests (CASSANDRA-6863) http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index a230e35..eaa1b3c 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -216,13 +216,13 @@ public class CommitLog implements CommitLogMBean // checksummed length dos.writeInt((int) size); checksum.update(buffer, buffer.position() - 4, 4); - buffer.putLong(checksum.getValue()); + buffer.putInt(checksum.getCrc()); int start = buffer.position(); // checksummed mutation Mutation.serializer.serialize(mutation, dos, MessagingService.current_version); checksum.update(buffer, start, (int) size); - buffer.putLong(checksum.getValue()); + buffer.putInt(checksum.getCrc()); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java index 6161435..d715fcc 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java @@ -103,17 +103,18 @@ public class CommitLogArchiver } } - public void maybeArchive(final String path, final String name) + public void maybeArchive(final CommitLogSegment segment) { if (Strings.isNullOrEmpty(archiveCommand)) return; - archivePending.put(name, executor.submit(new WrappedRunnable() + archivePending.put(segment.getName(), executor.submit(new WrappedRunnable() { protected void runMayThrow() throws IOException { - String command = archiveCommand.replace("%name", name); - command = command.replace("%path", path); + segment.waitForFinalSync(); + String command = archiveCommand.replace("%name", segment.getName()); + command = command.replace("%path", segment.getPath()); exec(command); } })); @@ -160,7 +161,26 @@ public class CommitLogArchiver } for (File fromFile : files) { - File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), new CommitLogDescriptor(CommitLogSegment.getNextId()).fileName()); + CommitLogDescriptor fromHeader = CommitLogDescriptor.fromHeader(fromFile); + CommitLogDescriptor fromName = CommitLogDescriptor.isValid(fromFile.getName()) ? CommitLogDescriptor.fromFileName(fromFile.getName()) : null; + CommitLogDescriptor descriptor; + if (fromHeader == null && fromName == null) + throw new IllegalStateException("Cannot safely construct descriptor for segment, either from its name or its header: " + fromFile.getPath()); + else if (fromHeader != null && fromName != null && !fromHeader.equals(fromName)) + throw new IllegalStateException(String.format("Cannot safely construct descriptor for segment, as name and header descriptors do not match (%s vs %s): %s", fromHeader, fromName, fromFile.getPath())); + else if (fromName != null && fromHeader == null && fromName.getVersion() >= CommitLogDescriptor.VERSION_21) + throw new IllegalStateException("Cannot safely construct descriptor for segment, as name descriptor implies a version that should contain a header descriptor, but that descriptor could not be read: " + fromFile.getPath()); + else if (fromHeader != null) + descriptor = fromHeader; + else descriptor = fromName; + + if (descriptor.getVersion() > CommitLogDescriptor.VERSION_21) + throw new IllegalStateException("Unsupported commit log version: " + descriptor.getVersion()); + + File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName()); + if (toFile.exists()) + throw new IllegalStateException("Trying to restore archive " + fromFile.getPath() + ", but the same segment already exists in the restore location: " + toFile.getPath()); + String command = restoreCommand.replace("%from", fromFile.getPath()); command = command.replace("%to", toFile.getPath()); try http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java index 0c8ed61..b11da94 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java @@ -20,10 +20,18 @@ */ package org.apache.cassandra.db.commitlog; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.PureJavaCrc32; public class CommitLogDescriptor { @@ -42,6 +50,9 @@ public class CommitLogDescriptor */ public static final int current_version = VERSION_21; + // [version, id, checksum] + static final int HEADER_SIZE = 4 + 8 + 4; + private final int version; public final long id; @@ -56,6 +67,43 @@ public class CommitLogDescriptor this(current_version, id); } + static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor) + { + out.putInt(0, descriptor.version); + out.putLong(4, descriptor.id); + PureJavaCrc32 crc = new PureJavaCrc32(); + crc.updateInt(descriptor.version); + crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL)); + crc.updateInt((int) (descriptor.id >>> 32)); + out.putInt(12, crc.getCrc()); + } + + public static CommitLogDescriptor fromHeader(File file) + { + try (RandomAccessFile raf = new RandomAccessFile(file, "r")) + { + assert raf.getFilePointer() == 0; + int version = raf.readInt(); + long id = raf.readLong(); + int crc = raf.readInt(); + PureJavaCrc32 checkcrc = new PureJavaCrc32(); + checkcrc.updateInt(version); + checkcrc.updateInt((int) (id & 0xFFFFFFFFL)); + checkcrc.updateInt((int) (id >>> 32)); + if (crc == checkcrc.getCrc()) + return new CommitLogDescriptor(version, id); + return null; + } + catch (EOFException e) + { + throw new RuntimeException(e); + } + catch (IOException e) + { + throw new FSReadError(e, file); + } + } + public static CommitLogDescriptor fromFileName(String name) { Matcher matcher; @@ -102,4 +150,20 @@ public class CommitLogDescriptor { return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches(); } + + public String toString() + { + return "(" + version + "," + id + ")"; + } + + public boolean equals(Object that) + { + return that instanceof CommitLogDescriptor && equals((CommitLogDescriptor) that); + } + + public boolean equals(CommitLogDescriptor that) + { + return this.version == that.version && this.id == that.id; + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index fb33187..59ae4e4 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -56,7 +56,7 @@ public class CommitLogReplayer private final AtomicInteger replayedCount; private final Map<UUID, ReplayPosition> cfPositions; private final ReplayPosition globalPosition; - private final Checksum checksum; + private final PureJavaCrc32 checksum; private byte[] buffer; public CommitLogReplayer() @@ -113,22 +113,26 @@ public class CommitLogReplayer return replayedCount.get(); } - private int readHeader(long segmentId, int offset, RandomAccessReader reader) throws IOException + private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException { if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE) { if (offset != reader.length() && offset != Integer.MAX_VALUE) - logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header"); + logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath()); // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment return -1; } reader.seek(offset); PureJavaCrc32 crc = new PureJavaCrc32(); - crc.update((int) (segmentId & 0xFFFFFFFFL)); - crc.update((int) (segmentId >>> 32)); - crc.update((int) reader.getPosition()); + crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL)); + crc.updateInt((int) (descriptor.id >>> 32)); + crc.updateInt((int) reader.getPosition()); int end = reader.readInt(); - long filecrc = reader.readLong(); + long filecrc; + if (descriptor.getVersion() < CommitLogDescriptor.VERSION_21) + filecrc = reader.readLong(); + else + filecrc = reader.readInt() & 0xffffffffL; if (crc.getValue() != filecrc) { if (end != 0 || filecrc != 0) @@ -150,7 +154,7 @@ public class CommitLogReplayer if (globalPosition.segment < segmentId) { if (version >= CommitLogDescriptor.VERSION_21) - return CommitLogSegment.SYNC_MARKER_SIZE; + return CommitLogDescriptor.HEADER_SIZE + CommitLogSegment.SYNC_MARKER_SIZE; else return 0; } @@ -244,7 +248,7 @@ public class CommitLogReplayer return; } - int prevEnd = 0; + int prevEnd = CommitLogDescriptor.HEADER_SIZE; main: while (true) { @@ -253,7 +257,7 @@ public class CommitLogReplayer end = Integer.MAX_VALUE; else { - do { end = readHeader(segmentId, end, reader); } + do { end = readSyncMarker(desc, end, reader); } while (end < offset && end > prevEnd); } @@ -290,12 +294,16 @@ public class CommitLogReplayer if (serializedSize < 10) break main; - long claimedSizeChecksum = reader.readLong(); + long claimedSizeChecksum; + if (desc.getVersion() < CommitLogDescriptor.VERSION_21) + claimedSizeChecksum = reader.readLong(); + else + claimedSizeChecksum = reader.readInt() & 0xffffffffL; checksum.reset(); if (desc.getVersion() < CommitLogDescriptor.VERSION_20) checksum.update(serializedSize); else - FBUtilities.updateChecksumInt(checksum, serializedSize); + checksum.updateInt(serializedSize); if (checksum.getValue() != claimedSizeChecksum) break main; // entry wasn't synced correctly/fully. that's @@ -304,7 +312,10 @@ public class CommitLogReplayer if (serializedSize > buffer.length) buffer = new byte[(int) (1.2 * serializedSize)]; reader.readFully(buffer, 0, serializedSize); - claimedCRC32 = reader.readLong(); + if (desc.getVersion() < CommitLogDescriptor.VERSION_21) + claimedCRC32 = reader.readLong(); + else + claimedCRC32 = reader.readInt() & 0xffffffffL; } catch (EOFException eof) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 3830966..2120d3e 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -43,6 +43,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.CLibrary; @@ -59,14 +60,24 @@ public class CommitLogSegment { private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class); - private final static long idBase = System.currentTimeMillis(); + private final static long idBase; private final static AtomicInteger nextId = new AtomicInteger(1); + static + { + long maxId = Long.MIN_VALUE; + for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles()) + { + if (CommitLogDescriptor.isValid(file.getName())) + maxId = Math.max(CommitLogDescriptor.fromFileName(file.getName()).id, maxId); + } + idBase = Math.max(System.currentTimeMillis(), maxId + 1); + } - // The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum) - static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8; + // The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum) + static final int ENTRY_OVERHEAD_SIZE = 4 + 4 + 4; - // The commit log (chained) sync marker/header size in bytes (int: length + long: checksum [segmentId, position]) - static final int SYNC_MARKER_SIZE = 4 + 8; + // The commit log (chained) sync marker/header size in bytes (int: length + int: checksum [segmentId, position]) + static final int SYNC_MARKER_SIZE = 4 + 4; // The OpOrder used to order appends wrt sync private final OpOrder appendOrder = new OpOrder(); @@ -154,10 +165,13 @@ public class CommitLogSegment fd = CLibrary.getfd(logFileAccessor.getFD()); buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize()); - // mark the initial header as uninitialised - buffer.putInt(0, 0); - buffer.putLong(4, 0); - allocatePosition.set(SYNC_MARKER_SIZE); + // write the header + CommitLogDescriptor.writeHeader(buffer, descriptor); + // mark the initial sync marker as uninitialised + buffer.putInt(CommitLogDescriptor.HEADER_SIZE, 0); + buffer.putLong(CommitLogDescriptor.HEADER_SIZE + 4, 0); + allocatePosition.set(CommitLogDescriptor.HEADER_SIZE + SYNC_MARKER_SIZE); + lastSyncedOffset = CommitLogDescriptor.HEADER_SIZE; } catch (IOException e) { @@ -292,11 +306,11 @@ public class CommitLogSegment // we don't chain the crcs here to ensure this method is idempotent if it fails int offset = lastSyncedOffset; final PureJavaCrc32 crc = new PureJavaCrc32(); - crc.update((int) (id & 0xFFFFFFFFL)); - crc.update((int) (id >>> 32)); - crc.update(offset); + crc.updateInt((int) (id & 0xFFFFFFFFL)); + crc.updateInt((int) (id >>> 32)); + crc.updateInt(offset); buffer.putInt(offset, nextMarker); - buffer.putLong(offset + 4, crc.getValue()); + buffer.putInt(offset + 4, crc.getCrc()); // zero out the next sync marker so replayer can cleanly exit if (nextMarker < buffer.capacity()) @@ -383,6 +397,23 @@ public class CommitLogSegment return logFile.getName(); } + void waitForFinalSync() + { + while (true) + { + WaitQueue.Signal signal = syncComplete.register(); + if (lastSyncedOffset < buffer.capacity()) + { + signal.awaitUninterruptibly(); + } + else + { + signal.cancel(); + break; + } + } + } + /** * Close the segment file. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java index b0be42c..5802e8a 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java @@ -21,7 +21,6 @@ import java.io.File; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -32,7 +31,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.*; @@ -222,7 +220,7 @@ public class CommitLogSegmentManager { // Now we can run the user defined command just after switching to the new commit log. // (Do this here instead of in the recycle call so we can get a head start on the archive.) - CommitLog.instance.archiver.maybeArchive(old.getPath(), old.getName()); + CommitLog.instance.archiver.maybeArchive(old); // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it old.discardUnusedTail(); @@ -314,8 +312,9 @@ public class CommitLogSegmentManager */ void recycleSegment(final CommitLogSegment segment) { + boolean archiveSuccess = CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName()); activeSegments.remove(segment); - if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName())) + if (!archiveSuccess) { // if archiving (command) was not successful then leave the file alone. don't delete or recycle. discardSegment(segment, false); http://git-wip-us.apache.org/repos/asf/cassandra/blob/134e0226/src/java/org/apache/cassandra/utils/PureJavaCrc32.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/PureJavaCrc32.java b/src/java/org/apache/cassandra/utils/PureJavaCrc32.java index 041652f..9a1ac02 100644 --- a/src/java/org/apache/cassandra/utils/PureJavaCrc32.java +++ b/src/java/org/apache/cassandra/utils/PureJavaCrc32.java @@ -31,7 +31,7 @@ import java.util.zip.Checksum; * * @see java.util.zip.CRC32 * - * This class is copied from hadoop-commons project. + * This class is copied from hadoop-commons project and retains that formatting. * (The initial patch added PureJavaCrc32 was HADOOP-6148) */ public class PureJavaCrc32 implements Checksum { @@ -49,7 +49,11 @@ public class PureJavaCrc32 implements Checksum { return (~crc) & 0xffffffffL; } - @Override + public int getCrc() { + return ~crc; + } + + @Override public void reset() { crc = 0xffffffff; } @@ -172,7 +176,14 @@ public class PureJavaCrc32 implements Checksum { crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)]; } - /* + final public void updateInt(int v) { + update((v >>> 24) & 0xFF); + update((v >>> 16) & 0xFF); + update((v >>> 8) & 0xFF); + update((v >>> 0) & 0xFF); + } + + /* * CRC-32 lookup tables generated by the polynomial 0xEDB88320. * See also TestPureJavaCrc32.Table. */