add commitlog archiving and pitr patch by Vijay and jbellis for CASSANDRA-3690
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5923d329 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5923d329 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5923d329 Branch: refs/heads/trunk Commit: 5923d32959ff419821dbb7fb36114a0604324498 Parents: 044e17a Author: Jonathan Ellis <jbel...@apache.org> Authored: Mon Apr 9 10:01:50 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Mon Apr 9 11:23:02 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + conf/cassandra.yaml | 8 + conf/commitlog_archiving.properties | 37 ++ src/java/org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 8 + .../apache/cassandra/db/commitlog/CommitLog.java | 253 ++------------ .../cassandra/db/commitlog/CommitLogAllocator.java | 20 +- .../cassandra/db/commitlog/CommitLogArchiver.java | 147 ++++++++ .../cassandra/db/commitlog/CommitLogMBean.java | 17 + .../cassandra/db/commitlog/CommitLogReplayer.java | 269 +++++++++++++++ .../cassandra/db/commitlog/CommitLogSegment.java | 16 +- src/java/org/apache/cassandra/utils/CLibrary.java | 33 +-- .../org/apache/cassandra/utils/FBUtilities.java | 34 ++- 13 files changed, 581 insertions(+), 264 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e47bb95..b80368a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 1.1.1-dev + * add support for commitlog archiving and point-in-time recovery + (CASSANDRA-3647) * update caches to use byte[] keys to reduce memory overhead (CASSANDRA-3966) * add column limit to cli (CASSANDRA-3012, 4098) * clean up and optimize DataOutputBuffer, used by CQL compression and http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index af6072a..611c7a4 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -150,6 +150,14 @@ saved_caches_directory: /var/lib/cassandra/saved_caches commitlog_sync: periodic commitlog_sync_period_in_ms: 10000 +# Configure the Size of the individual Commitlog file. The +# default is 128 MB, which is almost always fine, but if you are +# archiving commitlog segments (see commitlog_archiving.properties), +# then you probably want a finer granularity of archiving; 16 MB +# is reasonable. +# +# commitlog_segment_size_in_mb: 128 + # any class that implements the SeedProvider interface and has a # constructor that takes a Map<String, String> of parameters will do. seed_provider: http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/conf/commitlog_archiving.properties ---------------------------------------------------------------------- diff --git a/conf/commitlog_archiving.properties b/conf/commitlog_archiving.properties new file mode 100644 index 0000000..b4dfd79 --- /dev/null +++ b/conf/commitlog_archiving.properties @@ -0,0 +1,37 @@ +# commitlog archiving configuration. Leave blank to disable. + +# Command to execute to archive a commitlog segment +# Parameters: %path => Fully qualified path of the segment to archive +# %name => Name of the commit log. +# Example: archive_command=/bin/ln %path /backup/%name +# +# commitlog archiving configuration. Leave blank to disable. + +# Command to execute to archive a commitlog segment +# Parameters: %path => Fully qualified path of the segment to archive +# %name => Name of the commit log. +# Example: archive_command=/bin/ln %path /backup/%name +# +# Limitation: *_command= expects one command with arguments. STDOUT +# and STDIN or multiple commands cannot be executed. You might want +# to script multiple commands and add a pointer here. +archive_command= + +# Command to execute to make an archived commitlog live again. +# Parameters: %from is the full path to an archived commitlog segment (from restore_directories) +# %to is the live commitlog directory +# Example: restore_command=cp -f %from %to +restore_command= + +# Directory to scan the recovery files in. +restore_directories= + +# Restore mutations created up to and including this timestamp. +# Format: 2012-04-31 20:43:12 +# +# Note! Recovery will stop when the first client-supplied timestamp +# greater than this time is encountered. Since the order Cassandra +# receives mutations does not always strictly follow timestamp order, +# this may leave some mutations with timestamps earlier than the +# point-in-time unrecovered. +restore_point_in_time= http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 91b96f1..fe8eb84 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -100,6 +100,7 @@ public class Config public CommitLogSync commitlog_sync; public Double commitlog_sync_batch_window_in_ms; public Integer commitlog_sync_period_in_ms; + public int commitlog_segment_size_in_mb = 128; public String endpoint_snitch; public Boolean dynamic_snitch = true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 2eb89ba..259d6e8 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -782,6 +782,14 @@ public class DatabaseDescriptor return conf.commitlog_directory; } + /** + * size of commitlog segments to allocate + */ + public static int getCommitLogSegmentSize() + { + return conf.commitlog_segment_size_in_mb * 1024 * 1024; + } + public static String getSavedCachesLocation() { return conf.saved_caches_directory; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 69dd0b1..3c34772 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -23,31 +23,17 @@ import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.zip.Checksum; -import com.google.common.collect.Ordering; - -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.db.*; -import org.apache.cassandra.io.IColumnSerializer; import org.apache.cassandra.io.util.*; import org.apache.cassandra.net.MessagingService; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.io.util.FastByteArrayInputStream; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.PureJavaCrc32; -import org.apache.cassandra.utils.WrappedRunnable; -import org.cliffc.high_scale_lib.NonBlockingHashSet; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -58,9 +44,7 @@ import javax.management.ObjectName; */ public class CommitLog implements CommitLogMBean { - private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024; - - static final Logger logger = LoggerFactory.getLogger(CommitLog.class); + private static final Logger logger = LoggerFactory.getLogger(CommitLog.class); public static final CommitLog instance = new CommitLog(); @@ -68,11 +52,11 @@ public class CommitLog implements CommitLogMBean public final CommitLogAllocator allocator; + public final CommitLogArchiver archiver = new CommitLogArchiver(); + public static final int END_OF_SEGMENT_MARKER = 0; // this is written out at the end of a segment public static final int END_OF_SEGMENT_MARKER_SIZE = 4; // number of bytes of ^^^ - /** size of commitlog segments to allocate */ - public static final int SEGMENT_SIZE = 128 * 1024 * 1024; public CommitLogSegment activeSegment; private CommitLog() @@ -120,6 +104,8 @@ public class CommitLog implements CommitLogMBean */ public int recover() throws IOException { + archiver.maybeRestoreArchive(); + File[] files = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(new FilenameFilter() { public boolean accept(File dir, String name) @@ -157,203 +143,19 @@ public class CommitLog implements CommitLogMBean * @param clogs the list of commit log files to replay * @return the number of mutations replayed */ - public int recover(File[] clogs) throws IOException + public int recover(File... clogs) throws IOException { - final Set<Table> tablesRecovered = new NonBlockingHashSet<Table>(); - List<Future<?>> futures = new ArrayList<Future<?>>(); - byte[] bytes = new byte[4096]; - Map<Integer, AtomicInteger> invalidMutations = new HashMap<Integer, AtomicInteger>(); - - // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference. - final AtomicInteger replayedCount = new AtomicInteger(); - - // compute per-CF and global replay positions - final Map<Integer, ReplayPosition> cfPositions = new HashMap<Integer, ReplayPosition>(); - for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) - { - // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call - // below: gRP will return NONE if there are no flushed sstables, which is important to have in the - // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct). - ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables()); - cfPositions.put(cfs.metadata.cfId, rp); - } - final ReplayPosition globalPosition = Ordering.from(ReplayPosition.comparator).min(cfPositions.values()); - - Checksum checksum = new PureJavaCrc32(); - for (final File file : clogs) - { - logger.info("Replaying " + file.getPath()); - - final long segment = CommitLogSegment.idFromFilename(file.getName()); - - RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true); - assert reader.length() <= Integer.MAX_VALUE; - - try - { - int replayPosition; - if (globalPosition.segment < segment) - replayPosition = 0; - else if (globalPosition.segment == segment) - replayPosition = globalPosition.position; - else - replayPosition = (int) reader.length(); - - if (replayPosition < 0 || replayPosition >= reader.length()) - { - // replayPosition > reader.length() can happen if some data gets flushed before it is written to the commitlog - // (see https://issues.apache.org/jira/browse/CASSANDRA-2285) - logger.debug("skipping replay of fully-flushed {}", file); - continue; - } - - reader.seek(replayPosition); - - if (logger.isDebugEnabled()) - logger.debug("Replaying " + file + " starting at " + reader.getFilePointer()); - - /* read the logs populate RowMutation and apply */ - while (!reader.isEOF()) - { - if (logger.isDebugEnabled()) - logger.debug("Reading mutation at " + reader.getFilePointer()); - - long claimedCRC32; - int serializedSize; - try - { - // any of the reads may hit EOF - serializedSize = reader.readInt(); - if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER) - { - logger.debug("Encountered end of segment marker at " + reader.getFilePointer()); - break; - } - - // RowMutation must be at LEAST 10 bytes: - // 3 each for a non-empty Table and Key (including the 2-byte length from - // writeUTF/writeWithShortLength) and 4 bytes for column count. - // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 - if (serializedSize < 10) - break; - long claimedSizeChecksum = reader.readLong(); - checksum.reset(); - checksum.update(serializedSize); - if (checksum.getValue() != claimedSizeChecksum) - break; // entry wasn't synced correctly/fully. that's ok. - - if (serializedSize > bytes.length) - bytes = new byte[(int) (1.2 * serializedSize)]; - reader.readFully(bytes, 0, serializedSize); - claimedCRC32 = reader.readLong(); - } - catch(EOFException eof) - { - break; // last CL entry didn't get completely written. that's ok. - } - - checksum.update(bytes, 0, serializedSize); - if (claimedCRC32 != checksum.getValue()) - { - // this entry must not have been fsynced. probably the rest is bad too, - // but just in case there is no harm in trying them (since we still read on an entry boundary) - continue; - } - - /* deserialize the commit log entry */ - FastByteArrayInputStream bufIn = new FastByteArrayInputStream(bytes, 0, serializedSize); - RowMutation rm = null; - try - { - // assuming version here. We've gone to lengths to make sure what gets written to the CL is in - // the current version. so do make sure the CL is drained prior to upgrading a node. - rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, IColumnSerializer.Flag.LOCAL); - } - catch (UnknownColumnFamilyException ex) - { - AtomicInteger i = invalidMutations.get(ex.cfId); - if (i == null) - { - i = new AtomicInteger(1); - invalidMutations.put(ex.cfId, i); - } - else - i.incrementAndGet(); - continue; - } - - if (logger.isDebugEnabled()) - logger.debug(String.format("replaying mutation for %s.%s: %s", - rm.getTable(), - ByteBufferUtil.bytesToHex(rm.key()), - "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}")); - - final long entryLocation = reader.getFilePointer(); - final RowMutation frm = rm; - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() throws IOException - { - if (Schema.instance.getKSMetaData(frm.getTable()) == null) - return; - final Table table = Table.open(frm.getTable()); - RowMutation newRm = new RowMutation(frm.getTable(), frm.key()); - - // Rebuild the row mutation, omitting column families that a) have already been flushed, - // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every - // thing based on the cfid instead. - for (ColumnFamily columnFamily : frm.getColumnFamilies()) - { - if (Schema.instance.getCF(columnFamily.id()) == null) - // null means the cf has been dropped - continue; - - ReplayPosition rp = cfPositions.get(columnFamily.id()); - - // replay if current segment is newer than last flushed one or, if it is the last known - // segment, if we are after the replay position - if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position)) - { - newRm.add(columnFamily); - replayedCount.incrementAndGet(); - } - } - if (!newRm.isEmpty()) - { - Table.open(newRm.getTable()).apply(newRm, false); - tablesRecovered.add(table); - } - } - }; - futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable)); - if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) - { - FBUtilities.waitOnFutures(futures); - futures.clear(); - } - } - } - finally - { - FileUtils.closeQuietly(reader); - logger.info("Finished reading " + file); - } - } - - for (Map.Entry<Integer, AtomicInteger> entry : invalidMutations.entrySet()) - logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %d", entry.getValue().intValue(), entry.getKey())); - - // wait for all the writes to finish on the mutation stage - FBUtilities.waitOnFutures(futures); - logger.debug("Finished waiting on mutations from recovery"); - - // flush replayed tables - futures.clear(); - for (Table table : tablesRecovered) - futures.addAll(table.flush()); - FBUtilities.waitOnFutures(futures); + CommitLogReplayer recovery = new CommitLogReplayer(); + recovery.recover(clogs); + return recovery.blockForWrites(); + } - return replayedCount.get(); + /** + * Perform recovery on a single commit log. + */ + public void recover(String path) throws IOException + { + recover(new File(path)); } /** @@ -400,7 +202,7 @@ public class CommitLog implements CommitLogMBean public void add(RowMutation rm) throws IOException { long totalSize = RowMutation.serializer().serializedSize(rm, MessagingService.version_) + CommitLogSegment.ENTRY_OVERHEAD_SIZE; - if (totalSize > CommitLog.SEGMENT_SIZE) + if (totalSize > DatabaseDescriptor.getCommitLogSegmentSize()) { logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize); return; @@ -538,6 +340,19 @@ public class CommitLog implements CommitLogMBean activeSegment = allocator.fetchSegment(); } + public List<String> getActiveSegmentNames() + { + List<String> segmentNames = new ArrayList<String>(); + for (CommitLogSegment segment : allocator.getActiveSegments()) + segmentNames.add(segment.getName()); + return segmentNames; + } + + public List<String> getArchivingSegmentNames() + { + return new ArrayList<String>(archiver.archivePending.keySet()); + } + /** * Shuts down the threads used by the commit log, blocking until completion. */ @@ -565,7 +380,13 @@ public class CommitLog implements CommitLogMBean try { if (!activeSegment.hasCapacityFor(rowMutation)) + { + CommitLogSegment oldSegment = activeSegment; activateNextSegment(); + // Now we can run the user defined command just before switching to the new commit log. + // (Do this here instead of in the recycle call so we can get a head start on the archive.) + archiver.maybeArchive(oldSegment.getPath(), oldSegment.getName()); + } activeSegment.write(rowMutation); } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java index da30fe1..963e41e 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java @@ -145,13 +145,17 @@ public class CommitLogAllocator public void recycleSegment(final CommitLogSegment segment) { activeSegments.remove(segment); - + if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName())) + { + // if archiving (command) was not successful then leave the file alone. don't delete or recycle. + discardSegment(segment, false); + return; + } if (isCapExceeded()) { - discardSegment(segment); + discardSegment(segment, true); return; } - queue.add(new Runnable() { public void run() @@ -171,7 +175,7 @@ public class CommitLogAllocator public void recycleSegment(final File file) { // check against SEGMENT_SIZE avoids recycling odd-sized or empty segments from old C* versions and unit tests - if (isCapExceeded() || file.length() != CommitLog.SEGMENT_SIZE) + if (isCapExceeded() || file.length() != DatabaseDescriptor.getCommitLogSegmentSize()) { try { @@ -199,15 +203,15 @@ public class CommitLogAllocator * * @param segment segment to be discarded */ - private void discardSegment(final CommitLogSegment segment) + private void discardSegment(final CommitLogSegment segment, final boolean deleteFile) { - size.addAndGet(-CommitLog.SEGMENT_SIZE); + size.addAndGet(-DatabaseDescriptor.getCommitLogSegmentSize()); queue.add(new Runnable() { public void run() { - segment.discard(); + segment.discard(deleteFile); } }); } @@ -240,7 +244,7 @@ public class CommitLogAllocator */ private CommitLogSegment createFreshSegment() { - size.addAndGet(CommitLog.SEGMENT_SIZE); + size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize()); return internalAddReadySegment(CommitLogSegment.freshSegment()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java new file mode 100644 index 0000000..a2204c6 --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java @@ -0,0 +1,147 @@ +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.*; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.WrappedRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; + +public class CommitLogArchiver +{ + private static final Logger logger = LoggerFactory.getLogger(CommitLogArchiver.class); + public final Map<String, Future<?>> archivePending = new ConcurrentHashMap<String, Future<?>>(); + public final ExecutorService executor = new JMXEnabledThreadPoolExecutor("commitlog_archiver"); + private final String archiveCommand; + private final String restoreCommand; + private final String restoreDirectories; + public final long restorePointInTime; + + public CommitLogArchiver() + { + Properties commitlog_commands = new Properties(); + InputStream stream = null; + try + { + stream = getClass().getClassLoader().getResourceAsStream("commitlog_archiving.properties"); + + if (stream == null) + { + logger.debug("No commitlog_archiving properties found; archive + pitr will be disabled"); + archiveCommand = null; + restoreCommand = null; + restoreDirectories = null; + restorePointInTime = Long.MAX_VALUE; + } + else + { + commitlog_commands.load(stream); + archiveCommand = commitlog_commands.getProperty("archive_command"); + restoreCommand = commitlog_commands.getProperty("restore_command"); + restoreDirectories = commitlog_commands.getProperty("restore_directories"); + String targetTime = commitlog_commands.getProperty("restore_point_in_time"); + try + { + restorePointInTime = Strings.isNullOrEmpty(targetTime) ? Long.MAX_VALUE : new SimpleDateFormat("yyyy:MM:dd HH:mm:ss").parse(targetTime).getTime(); + } + catch (ParseException e) + { + throw new RuntimeException("Unable to parse restore target time", e); + } + } + } + catch (IOException e) + { + throw new RuntimeException("Unable to load commitlog_archiving.properties", e); + } + finally + { + FileUtils.closeQuietly(stream); + } + } + + public void maybeArchive(final String path, final String name) + { + if (Strings.isNullOrEmpty(archiveCommand)) + return; + + archivePending.put(name, executor.submit(new WrappedRunnable() + { + protected void runMayThrow() throws IOException + { + String command = archiveCommand.replace("%name", name); + command = command.replace("%path", path); + exec(command); + } + })); + } + + public boolean maybeWaitForArchiving(String name) + { + Future<?> f = archivePending.remove(name); + if (f == null) + return true; // archiving disabled + + try + { + f.get(); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + catch (ExecutionException e) + { + if (e.getCause() instanceof IOException) + { + logger.info("Looks like the archiving of file {} failed earlier, cassandra is going to ignore this segment for now.", name); + return false; + } + throw new RuntimeException(e); + } + + return true; + } + + public void maybeRestoreArchive() throws IOException + { + if (Strings.isNullOrEmpty(restoreDirectories)) + return; + + for (String dir : restoreDirectories.split(",")) + { + File[] files = new File(dir).listFiles(); + for (File fromFile : files) + { + File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), + CommitLogSegment.FILENAME_PREFIX + + System.nanoTime() + + CommitLogSegment.FILENAME_EXTENSION); + String command = restoreCommand.replace("%from", fromFile.getPath()); + command = command.replace("%to", toFile.getPath()); + exec(command); + } + } + } + + private void exec(String command) throws IOException + { + ProcessBuilder pb = new ProcessBuilder(command.split(" ")); + pb.redirectErrorStream(true); + FBUtilities.exec(pb); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java index 8f8fa19..29f95a7 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java @@ -19,6 +19,8 @@ package org.apache.cassandra.db.commitlog; +import java.io.IOException; +import java.util.List; public interface CommitLogMBean { @@ -36,4 +38,19 @@ public interface CommitLogMBean * Get the current size used by all the commitlog segments. */ public long getTotalCommitlogSize(); + + /** + * Recover a single file. + */ + public void recover(String path) throws IOException; + + /** + * @return file names (not full paths) of active commit log segments (segments containing unflushed data) + */ + public List<String> getActiveSegmentNames(); + + /** + * @return Files which are pending for archival attempt. Does NOT include failed archive attempts. + */ + public List<String> getArchivingSegmentNames(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java new file mode 100644 index 0000000..eb997fc --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -0,0 +1,269 @@ +package org.apache.cassandra.db.commitlog; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.Checksum; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.db.UnknownColumnFamilyException; +import org.apache.cassandra.io.IColumnSerializer; +import org.apache.cassandra.io.util.FastByteArrayInputStream; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.PureJavaCrc32; +import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.commons.lang.StringUtils; +import org.cliffc.high_scale_lib.NonBlockingHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Ordering; + +public class CommitLogReplayer +{ + private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class); + private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024; + + private final Set<Table> tablesRecovered; + private final List<Future<?>> futures; + private final Map<Integer, AtomicInteger> invalidMutations; +private final AtomicInteger replayedCount; + private final Map<Integer, ReplayPosition> cfPositions; + private final ReplayPosition globalPosition; + private final Checksum checksum; + private byte[] buffer; + + public CommitLogReplayer() + { + this.tablesRecovered = new NonBlockingHashSet<Table>(); + this.futures = new ArrayList<Future<?>>(); + this.buffer = new byte[4096]; + this.invalidMutations = new HashMap<Integer, AtomicInteger>(); + // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference. + this.replayedCount = new AtomicInteger(); + // compute per-CF and global replay positions + this.cfPositions = new HashMap<Integer, ReplayPosition>(); + for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) + { + // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call + // below: gRP will return NONE if there are no flushed sstables, which is important to have in the + // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct). + ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables()); + cfPositions.put(cfs.metadata.cfId, rp); + } + this.globalPosition = Ordering.from(ReplayPosition.comparator).min(cfPositions.values()); + this.checksum = new PureJavaCrc32(); + } + + public void recover(File[] clogs) throws IOException + { + for (final File file : clogs) + recover(file); + } + + public int blockForWrites() throws IOException + { + for (Map.Entry<Integer, AtomicInteger> entry : invalidMutations.entrySet()) + logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %d", entry.getValue().intValue(), entry.getKey())); + + // wait for all the writes to finish on the mutation stage + FBUtilities.waitOnFutures(futures); + logger.debug("Finished waiting on mutations from recovery"); + + // flush replayed tables + futures.clear(); + for (Table table : tablesRecovered) + futures.addAll(table.flush()); + FBUtilities.waitOnFutures(futures); + return replayedCount.get(); + } + + public void recover(File file) throws IOException + { + logger.info("Replaying " + file.getPath()); + final long segment = CommitLogSegment.idFromFilename(file.getName()); + RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true); + assert reader.length() <= Integer.MAX_VALUE; + try + { + int replayPosition; + if (globalPosition.segment < segment) + replayPosition = 0; + else if (globalPosition.segment == segment) + replayPosition = globalPosition.position; + else + replayPosition = (int) reader.length(); + + if (replayPosition < 0 || replayPosition >= reader.length()) + { + // replayPosition > reader.length() can happen if some data gets flushed before it is written to the commitlog + // (see https://issues.apache.org/jira/browse/CASSANDRA-2285) + logger.debug("skipping replay of fully-flushed {}", file); + return; + } + + reader.seek(replayPosition); + + if (logger.isDebugEnabled()) + logger.debug("Replaying " + file + " starting at " + reader.getFilePointer()); + + /* read the logs populate RowMutation and apply */ + while (!reader.isEOF()) + { + if (logger.isDebugEnabled()) + logger.debug("Reading mutation at " + reader.getFilePointer()); + + long claimedCRC32; + int serializedSize; + try + { + // any of the reads may hit EOF + serializedSize = reader.readInt(); + if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER) + { + logger.debug("Encountered end of segment marker at " + reader.getFilePointer()); + break; + } + + // RowMutation must be at LEAST 10 bytes: + // 3 each for a non-empty Table and Key (including the + // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count. + // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 + if (serializedSize < 10) + break; + long claimedSizeChecksum = reader.readLong(); + checksum.reset(); + checksum.update(serializedSize); + if (checksum.getValue() != claimedSizeChecksum) + break; // entry wasn't synced correctly/fully. that's + // ok. + + if (serializedSize > buffer.length) + buffer = new byte[(int) (1.2 * serializedSize)]; + reader.readFully(buffer, 0, serializedSize); + claimedCRC32 = reader.readLong(); + } + catch (EOFException eof) + { + break; // last CL entry didn't get completely written. that's ok. + } + + checksum.update(buffer, 0, serializedSize); + if (claimedCRC32 != checksum.getValue()) + { + // this entry must not have been fsynced. probably the rest is bad too, + // but just in case there is no harm in trying them (since we still read on an entry boundary) + continue; + } + + /* deserialize the commit log entry */ + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize); + RowMutation rm; + try + { + // assuming version here. We've gone to lengths to make sure what gets written to the CL is in + // the current version. so do make sure the CL is drained prior to upgrading a node. + rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, IColumnSerializer.Flag.LOCAL); + } + catch (UnknownColumnFamilyException ex) + { + AtomicInteger i = invalidMutations.get(ex.cfId); + if (i == null) + { + i = new AtomicInteger(1); + invalidMutations.put(ex.cfId, i); + } + else + i.incrementAndGet(); + continue; + } + + if (logger.isDebugEnabled()) + logger.debug(String.format("replaying mutation for %s.%s: %s", rm.getTable(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + + "}")); + + final long entryLocation = reader.getFilePointer(); + final RowMutation frm = rm; + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws IOException + { + if (Schema.instance.getKSMetaData(frm.getTable()) == null) + return; + if (pointInTimeExceeded(frm)) + return; + + final Table table = Table.open(frm.getTable()); + RowMutation newRm = new RowMutation(frm.getTable(), frm.key()); + + // Rebuild the row mutation, omitting column families that + // a) have already been flushed, + // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. + for (ColumnFamily columnFamily : frm.getColumnFamilies()) + { + if (Schema.instance.getCF(columnFamily.id()) == null) + // null means the cf has been dropped + continue; + + ReplayPosition rp = cfPositions.get(columnFamily.id()); + + // replay if current segment is newer than last flushed one or, + // if it is the last known segment, if we are after the replay position + if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position)) + { + newRm.add(columnFamily); + replayedCount.incrementAndGet(); + } + } + if (!newRm.isEmpty()) + { + Table.open(newRm.getTable()).apply(newRm, false); + tablesRecovered.add(table); + } + } + }; + futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable)); + if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) + { + FBUtilities.waitOnFutures(futures); + futures.clear(); + } + } + } + finally + { + FileUtils.closeQuietly(reader); + logger.info("Finished reading " + file); + } + } + + protected boolean pointInTimeExceeded(RowMutation frm) + { + long restoreTarget = CommitLog.instance.archiver.restorePointInTime; + + for (ColumnFamily families : frm.getColumnFamilies()) + { + if (families.maxTimestamp() > restoreTarget) + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 45408ec..2ee90ad 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -53,9 +53,9 @@ public class CommitLogSegment { private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class); - private static final String FILENAME_PREFIX = "CommitLog-"; - private static final String FILENAME_EXTENSION = ".log"; - private static Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "(\\d+)" + FILENAME_EXTENSION); + static final String FILENAME_PREFIX = "CommitLog-"; + static final String FILENAME_EXTENSION = ".log"; + private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "(\\d+)" + FILENAME_EXTENSION); // The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum) static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8; @@ -115,9 +115,9 @@ public class CommitLogSegment } // Map the segment, extending or truncating it to the standard segment size - logFileAccessor.setLength(CommitLog.SEGMENT_SIZE); + logFileAccessor.setLength(DatabaseDescriptor.getCommitLogSegmentSize()); - buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, CommitLog.SEGMENT_SIZE); + buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize()); buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER); buffer.position(0); @@ -163,12 +163,14 @@ public class CommitLogSegment /** * Completely discards a segment file by deleting it. (Potentially blocking operation) */ - public void discard() + public void discard(boolean deleteFile) { + // TODO shouldn't we close the file when we're done writing to it, which comes (potentially) much earlier than it's eligible for recyling? close(); try { - FileUtils.deleteWithConfirm(logFile); + if (deleteFile) + FileUtils.deleteWithConfirm(logFile); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/utils/CLibrary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/CLibrary.java b/src/java/org/apache/cassandra/utils/CLibrary.java index 18140a2..d970d61 100644 --- a/src/java/org/apache/cassandra/utils/CLibrary.java +++ b/src/java/org/apache/cassandra/utils/CLibrary.java @@ -18,18 +18,14 @@ */ package org.apache.cassandra.utils; -import java.io.BufferedReader; import java.io.File; import java.io.FileDescriptor; import java.io.IOException; -import java.io.InputStreamReader; import java.lang.reflect.Field; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.lang.StringUtils; - import com.sun.jna.LastErrorException; import com.sun.jna.Native; @@ -188,7 +184,7 @@ public final class CLibrary } try { - exec(pb); + FBUtilities.exec(pb); } catch (IOException ex) { @@ -197,33 +193,6 @@ public final class CLibrary } } - private static void exec(ProcessBuilder pb) throws IOException - { - Process p = pb.start(); - try - { - int errCode = p.waitFor(); - if (errCode != 0) - { - BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream())); - BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream())); - StringBuffer buff = new StringBuffer(); - String str; - while ((str = in.readLine()) != null) - buff.append(str).append(System.getProperty("line.separator")); - while ((str = err.readLine()) != null) - buff.append(str).append(System.getProperty("line.separator")); - throw new IOException("Exception while executing the command: "+ StringUtils.join(pb.command(), " ") + - ", command error Code: " + errCode + - ", command output: "+ buff.toString()); - } - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - public static void trySkipCache(int fd, long offset, int len) { if (fd < 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5923d329/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 38df00e..c0fe9bd 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.google.common.base.Joiner; import com.google.common.collect.AbstractIterator; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -320,7 +321,7 @@ public class FBUtilities public static String resourceToFile(String filename) throws ConfigurationException { - ClassLoader loader = PropertyFileSnitch.class.getClassLoader(); + ClassLoader loader = FBUtilities.class.getClassLoader(); URL scpurl = loader.getResource(filename); if (scpurl == null) throw new ConfigurationException("unable to locate " + filename); @@ -549,6 +550,37 @@ public class FBUtilities } } + /** + * Starts and waits for the given @param pb to finish. + * @throws java.io.IOException on non-zero exit code + */ + public static void exec(ProcessBuilder pb) throws IOException + { + Process p = pb.start(); + try + { + int errCode = p.waitFor(); + if (errCode != 0) + { + BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream())); + BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream())); + StringBuilder sb = new StringBuilder(); + String str; + while ((str = in.readLine()) != null) + sb.append(str).append(System.getProperty("line.separator")); + while ((str = err.readLine()) != null) + sb.append(str).append(System.getProperty("line.separator")); + throw new IOException("Exception while executing the command: "+ StringUtils.join(pb.command(), " ") + + ", command error Code: " + errCode + + ", command output: "+ sb.toString()); + } + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + } + private static final class WrappedCloseableIterator<T> extends AbstractIterator<T> implements CloseableIterator<T> {