Repository: cassandra Updated Branches: refs/heads/trunk 4838e81a6 -> e9da85723
Make CDC availability more deterministic via hard-linking Patch by jmckenzie; reviewed by blambov for CASSANDRA-12148 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e9da8572 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e9da8572 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e9da8572 Branch: refs/heads/trunk Commit: e9da85723a8dd40872c4bca087a03b655bd2cacb Parents: 4838e81 Author: Josh McKenzie <jmcken...@apache.org> Authored: Tue Jul 12 12:53:55 2016 -0400 Committer: Josh McKenzie <jmcken...@apache.org> Committed: Thu Aug 24 13:24:00 2017 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 6 +- doc/source/operating/cdc.rst | 31 +- .../cassandra/config/DatabaseDescriptor.java | 4 + .../AbstractCommitLogSegmentManager.java | 20 +- .../cassandra/db/commitlog/CommitLog.java | 8 +- .../db/commitlog/CommitLogDescriptor.java | 5 + .../db/commitlog/CommitLogPosition.java | 1 + .../db/commitlog/CommitLogReadHandler.java | 2 +- .../cassandra/db/commitlog/CommitLogReader.java | 10 +- .../db/commitlog/CommitLogReplayer.java | 57 ++- .../db/commitlog/CommitLogSegment.java | 60 +++- .../commitlog/CommitLogSegmentManagerCDC.java | 77 ++-- .../CommitLogSegmentManagerStandard.java | 14 +- .../cassandra/exceptions/CDCWriteException.java | 26 ++ .../cassandra/exceptions/ExceptionCode.java | 19 +- .../transport/messages/ErrorMessage.java | 5 + test/conf/cassandra.yaml | 2 + test/conf/cdc.yaml | 3 + .../cassandra/db/commitlog/CDCTestReplayer.java | 76 ++++ .../CommitLogSegmentManagerCDCTest.java | 351 +++++++++++++++---- 21 files changed, 617 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8f6fa48..cdcfe25 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -25,6 +25,7 @@ * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996) * Default for start_native_transport now true if not set in config (CASSANDRA-13656) * Don't add localhost to the graph when calculating where to stream from (CASSANDRA-13583) + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148) * Allow skipping equality-restricted clustering columns in ORDER BY clause (CASSANDRA-10271) * Use common nowInSec for validation compactions (CASSANDRA-13671) * Improve handling of IR prepare failures (CASSANDRA-13672) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 253d773..2038342 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -25,7 +25,11 @@ New features - Support for arithmetic operations on number has been added. See CASSANDRA-11935 - Preview expected streaming required for a repair (nodetool repair --preview), and validate the consistency of repaired data between nodes (nodetool repair --validate). See CASSANDRA-13257 - - Support for selecting Map values and Set elements has been added for SELECT queries. See CASSANDRA-7396 + - Support for selecting Map values and Set elements has been added for SELECT queries. See CASSANDRA-7396 + - Change-Data-Capture has been modified to make CommitLogSegments available + immediately upon creation via hard-linking the files. This means that incomplete + segments will be available in cdc_raw rather than fully flushed. See documentation + and CASSANDRA-12148 for more detail. Upgrading --------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/doc/source/operating/cdc.rst ---------------------------------------------------------------------- diff --git a/doc/source/operating/cdc.rst b/doc/source/operating/cdc.rst index 192f62a..a7177b5 100644 --- a/doc/source/operating/cdc.rst +++ b/doc/source/operating/cdc.rst @@ -23,18 +23,26 @@ Overview ^^^^^^^^ Change data capture (CDC) provides a mechanism to flag specific tables for archival as well as rejecting writes to those -tables once a configurable size-on-disk for the combined flushed and unflushed CDC-log is reached. An operator can -enable CDC on a table by setting the table property ``cdc=true`` (either when :ref:`creating the table -<create-table-statement>` or :ref:`altering it <alter-table-statement>`), after which any CommitLogSegments containing -data for a CDC-enabled table are moved to the directory specified in ``cassandra.yaml`` on segment discard. A threshold -of total disk space allowed is specified in the yaml at which time newly allocated CommitLogSegments will not allow CDC -data until a consumer parses and removes data from the destination archival directory. +tables once a configurable size-on-disk for the CDC log is reached. An operator can enable CDC on a table by setting the +table property ``cdc=true`` (either when :ref:`creating the table <create-table-statement>` or +:ref:`altering it <alter-table-statement>`). Upon CommitLogSegment creation, a hard-link to the segment is created in the +directory specified in ``cassandra.yaml``. On segment fsync to disk, if CDC data is present anywhere in the segment a +<segment_name>_cdc.idx file is also created with the integer offset of how much data in the original segment is persisted +to disk. Upon final segment flush, a second line with the human-readable word "COMPLETED" will be added to the _cdc.idx +file indicating that Cassandra has completed all processing on the file. + +We we use an index file rather than just encouraging clients to parse the log realtime off a memory mapped handle as data +can be reflected in a kernel buffer that is not yet persisted to disk. Parsing only up to the listed offset in the _cdc.idx +file will ensure that you only parse CDC data for data that is durable. + +A threshold of total disk space allowed is specified in the yaml at which time newly allocated CommitLogSegments will +not allow CDC data until a consumer parses and removes files from the specified cdc_raw directory. Configuration ^^^^^^^^^^^^^ -Enabling or disable CDC on a table -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Enabling or disabling CDC on a table +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ CDC is enable or disable through the `cdc` table property, for instance:: @@ -64,7 +72,7 @@ The following `cassandra.yaml` are available for CDC: Reading CommitLogSegments ^^^^^^^^^^^^^^^^^^^^^^^^^ -This implementation included a refactor of CommitLogReplayer into `CommitLogReader.java +Use a `CommitLogReader.java <https://github.com/apache/cassandra/blob/e31e216234c6b57a531cae607e0355666007deb2/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java>`__. Usage is `fairly straightforward <https://github.com/apache/cassandra/blob/e31e216234c6b57a531cae607e0355666007deb2/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java#L132-L140>`__ @@ -78,12 +86,11 @@ Warnings **Do not enable CDC without some kind of consumption process in-place.** -The initial implementation of Change Data Capture does not include a parser (see :ref:`reading-commitlogsegments` above) -so, if CDC is enabled on a node and then on a table, the ``cdc_free_space_in_mb`` will fill up and then writes to +If CDC is enabled on a node and then on a table, the ``cdc_free_space_in_mb`` will fill up and then writes to CDC-enabled tables will be rejected unless some consumption process is in place. Further Reading ^^^^^^^^^^^^^^^ -- `Design doc <https://docs.google.com/document/d/1ZxCWYkeZTquxsvf5hdPc0fiUnUHna8POvgt6TIzML4Y/edit>`__ - `JIRA ticket <https://issues.apache.org/jira/browse/CASSANDRA-8844>`__ +- `JIRA ticket <https://issues.apache.org/jira/browse/CASSANDRA-12148>`__ http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index a839224..f06b8e1 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -441,6 +441,10 @@ public class DatabaseDescriptor conf.cdc_raw_directory = storagedirFor("cdc_raw"); } + // Windows memory-mapped CommitLog files is incompatible with CDC as we hard-link files in cdc_raw. Confirm we don't have both enabled. + if (FBUtilities.isWindows && conf.cdc_enabled && conf.commitlog_compression == null) + throw new ConfigurationException("Cannot enable cdc on Windows with uncompressed commitlog."); + if (conf.commitlog_total_space_in_mb == null) { int preferredSize = 8192; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java index 808ef37..42fffc4 100755 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import net.nicoulaj.compilecommand.annotations.DontInline; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.schema.TableId; @@ -180,19 +181,12 @@ public abstract class AbstractCommitLogSegmentManager } } - /** * Allocate a segment within this CLSM. Should either succeed or throw. */ public abstract Allocation allocate(Mutation mutation, int size); /** - * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM - * decide what to do with those segments on disk after they've been replayed. - */ - abstract void handleReplayedSegment(final File file); - - /** * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit * to segment manager so it's performed on segment management thread. */ @@ -332,6 +326,18 @@ public abstract class AbstractCommitLogSegmentManager } /** + * Delete untracked segment files after replay + * + * @param file segment file that is no longer in use. + */ + void handleReplayedSegment(final File file) + { + // (don't decrease managed size, since this was never a "live" segment) + logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file); + FileUtils.deleteWithConfirm(file); + } + + /** * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards. * @param addedSize */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index e93a131..a4978b1 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -34,7 +34,7 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.*; -import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.exceptions.CDCWriteException; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; @@ -239,9 +239,9 @@ public class CommitLog implements CommitLogMBean * Add a Mutation to the commit log. If CDC is enabled, this can fail. * * @param mutation the Mutation to add to the log - * @throws WriteTimeoutException + * @throws CDCWriteException */ - public CommitLogPosition add(Mutation mutation) throws WriteTimeoutException + public CommitLogPosition add(Mutation mutation) throws CDCWriteException { assert mutation != null; @@ -431,6 +431,7 @@ public class CommitLog implements CommitLogMBean } /** + * FOR TESTING PURPOSES */ public void stopUnsafe(boolean deleteSegments) { @@ -448,7 +449,6 @@ public class CommitLog implements CommitLogMBean if (DatabaseDescriptor.isCDCEnabled() && deleteSegments) for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles()) FileUtils.deleteWithConfirm(f); - } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java index dd9414a..700f12a 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java @@ -219,6 +219,11 @@ public class CommitLogDescriptor return FILENAME_PREFIX + version + SEPARATOR + id + FILENAME_EXTENSION; } + public String cdcIndexFileName() + { + return FILENAME_PREFIX + version + SEPARATOR + id + "_cdc.idx"; + } + /** * @param filename the filename to check * @return true if filename could be a commit log based on it's filename http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java index 84054a4..3ffb04c 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java @@ -40,6 +40,7 @@ public class CommitLogPosition implements Comparable<CommitLogPosition> public static final CommitLogPosition NONE = new CommitLogPosition(-1, 0); public final long segmentId; + // Indicates the end position of the mutation in the CommitLog public final int position; public static final Comparator<CommitLogPosition> comparator = new Comparator<CommitLogPosition>() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java index 0602147..ee05235 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java @@ -69,7 +69,7 @@ public interface CommitLogReadHandler * * @param m deserialized mutation * @param size serialized size of the mutation - * @param entryLocation filePointer offset inside the CommitLogSegment for the record + * @param entryLocation filePointer offset inside the CommitLogSegment for the end of the record * @param desc CommitLogDescriptor for mutation being processed */ void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java index 5fb1d5d..301c832 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java @@ -96,6 +96,14 @@ public class CommitLogReader } /** + * Reads all mutations from passed in file from minPosition + */ + public void readCommitLogSegment(CommitLogReadHandler handler, File file, CommitLogPosition minPosition, boolean tolerateTruncation) throws IOException + { + readCommitLogSegment(handler, file, minPosition, ALL_MUTATIONS, tolerateTruncation); + } + + /** * Reads passed in file fully, up to mutationLimit count */ @VisibleForTesting @@ -357,7 +365,7 @@ public class CommitLogReader * @param inputBuffer raw byte array w/Mutation data * @param size deserialized size of mutation * @param minPosition We need to suppress replay of mutations that are before the required minPosition - * @param entryLocation filePointer offset of mutation within CommitLogSegment + * @param entryLocation filePointer offset of end of mutation within CommitLogSegment * @param desc CommitLogDescriptor being worked on */ @VisibleForTesting http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 961107c..d1e63e6 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -34,17 +34,20 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.config.Config; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; @@ -72,6 +75,9 @@ public class CommitLogReplayer implements CommitLogReadHandler private final CommitLogArchiver archiver; @VisibleForTesting + protected boolean sawCDCMutation; + + @VisibleForTesting protected CommitLogReader commitLogReader; CommitLogReplayer(CommitLog commitLog, @@ -130,14 +136,52 @@ public class CommitLogReplayer implements CommitLogReadHandler public void replayPath(File file, boolean tolerateTruncation) throws IOException { + sawCDCMutation = false; commitLogReader.readCommitLogSegment(this, file, globalPosition, CommitLogReader.ALL_MUTATIONS, tolerateTruncation); + if (sawCDCMutation) + handleCDCReplayCompletion(file); } public void replayFiles(File[] clogs) throws IOException { - commitLogReader.readAllFiles(this, clogs, globalPosition); + for (int i = 0; i < clogs.length; i++) + { + sawCDCMutation = false; + commitLogReader.readCommitLogSegment(this, clogs[i], globalPosition, i == clogs.length - 1); + if (sawCDCMutation) + handleCDCReplayCompletion(clogs[i]); + } } + + /** + * Upon replay completion, CDC needs to hard-link files in the CDC folder and calculate index files so consumers can + * begin their work. + */ + private void handleCDCReplayCompletion(File f) throws IOException + { + // Can only reach this point if CDC is enabled, thus we have a CDCSegmentManager + ((CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager).addCDCSize(f.length()); + + File dest = new File(DatabaseDescriptor.getCDCLogLocation(), f.getName()); + + // If hard link already exists, assume it's from a previous node run. If people are mucking around in the cdc_raw + // directory that's on them. + if (!dest.exists()) + FileUtils.createHardLink(f, dest); + + // The reader has already verified we can deserialize the descriptor. + CommitLogDescriptor desc; + try(RandomAccessReader reader = RandomAccessReader.open(f)) + { + desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext()); + assert desc != null; + assert f.length() < Integer.MAX_VALUE; + CommitLogSegment.writeCDCIndexFile(desc, (int)f.length(), true); + } + } + + /** * Flushes all keyspaces associated with this replayer in parallel, blocking until their flushes are complete. * @return the number of mutations replayed @@ -366,6 +410,9 @@ public class CommitLogReplayer implements CommitLogReadHandler public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc) { + if (DatabaseDescriptor.isCDCEnabled() && m.trackedByCDC()) + sawCDCMutation = true; + pendingMutationBytes += size; futures.offer(mutationInitiator.initiateMutation(m, desc.id, http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 5edf72b..8dd84e0 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db.commitlog; import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -28,6 +29,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.CRC32; +import com.google.common.annotations.VisibleForTesting; import org.cliffc.high_scale_lib.NonBlockingHashMap; import com.codahale.metrics.Timer; @@ -92,7 +94,8 @@ public abstract class CommitLogSegment // Everything before this offset has been synced and written. The SYNC_MARKER_SIZE bytes after // each sync are reserved, and point forwards to the next such offset. The final // sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker. - private volatile int lastSyncedOffset; + @VisibleForTesting + volatile int lastSyncedOffset; // The end position of the buffer. Initially set to its capacity and updated to point to the last written position // as the segment is being closed. @@ -212,7 +215,10 @@ public abstract class CommitLogSegment opGroup.close(); return null; } - markDirty(mutation, position); + + for (PartitionUpdate update : mutation.getPartitionUpdates()) + coverInMap(tableDirty, update.metadata().id, position); + return new Allocation(this, opGroup, position, (ByteBuffer) buffer.duplicate().position(position).limit(position + size)); } catch (Throwable t) @@ -333,6 +339,9 @@ public abstract class CommitLogSegment // Possibly perform compression or encryption, writing to file and flush. write(startMarker, sectionEnd); + if (cdcState == CDCState.CONTAINS) + writeCDCIndexFile(descriptor, sectionEnd, close); + // Signal the sync as complete. lastSyncedOffset = nextMarker; if (close) @@ -341,6 +350,27 @@ public abstract class CommitLogSegment } /** + * We persist the offset of the last data synced to disk so clients can parse only durable data if they choose. Data + * in shared / memory-mapped buffers reflects un-synced data so we need an external sentinel for clients to read to + * determine actual durable data persisted. + */ + public static void writeCDCIndexFile(CommitLogDescriptor desc, int offset, boolean complete) + { + try(FileWriter writer = new FileWriter(new File(DatabaseDescriptor.getCDCLogLocation(), desc.cdcIndexFileName()))) + { + writer.write(String.valueOf(offset)); + if (complete) + writer.write("\nCOMPLETED"); + writer.flush(); + } + catch (IOException e) + { + if (!CommitLog.instance.handleCommitError("Failed to sync CDC Index: " + desc.cdcIndexFileName(), e)) + throw new RuntimeException(e); + } + } + + /** * Create a sync marker to delineate sections of the commit log, typically created on each sync of the file. * The sync marker consists of a file pointer to where the next sync marker should be (effectively declaring the length * of this section), as well as a CRC value. @@ -405,6 +435,22 @@ public abstract class CommitLogSegment return logFile.getName(); } + /** + * @return a File object representing the CDC directory and this file name for hard-linking + */ + public File getCDCFile() + { + return new File(DatabaseDescriptor.getCDCLogLocation(), logFile.getName()); + } + + /** + * @return a File object representing the CDC Index file holding the offset and completion status of this segment + */ + public File getCDCIndexFile() + { + return new File(DatabaseDescriptor.getCDCLogLocation(), descriptor.cdcIndexFileName()); + } + void waitForFinalSync() { while (true) @@ -475,12 +521,6 @@ public abstract class CommitLogSegment i.expandToCover(value); } - void markDirty(Mutation mutation, int allocatedPosition) - { - for (PartitionUpdate update : mutation.getPartitionUpdates()) - coverInMap(tableDirty, update.metadata().id, allocatedPosition); - } - /** * Marks the ColumnFamily specified by id as clean for this log segment. If the * given context argument is contained in this file, it will only mark the CF as @@ -623,6 +663,7 @@ public abstract class CommitLogSegment // Also synchronized in CDCSizeTracker.processNewSegment and .processDiscardedSegment synchronized(cdcStateLock) { + // Need duplicate CONTAINS to be idempotent since 2 threads can race on this lock if (cdcState == CDCState.CONTAINS && newState != CDCState.CONTAINS) throw new IllegalArgumentException("Cannot transition from CONTAINS to any other state."); @@ -673,6 +714,9 @@ public abstract class CommitLogSegment segment.waitForSync(position, waitingOnCommit); } + /** + * Returns the position in the CommitLogSegment at the end of this allocation. + */ public CommitLogPosition getCommitLogPosition() { return new CommitLogPosition(segment.id, buffer.limit()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java index a91384f..4d31aad 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java @@ -32,9 +32,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.commitlog.CommitLogSegment.CDCState; -import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.exceptions.CDCWriteException; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.DirectorySizeCalculator; import org.apache.cassandra.utils.NoSpamLogger; @@ -64,12 +64,20 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager cdcSizeTracker.processDiscardedSegment(segment); - if (segment.getCDCState() == CDCState.CONTAINS) - FileUtils.renameWithConfirm(segment.logFile.getAbsolutePath(), DatabaseDescriptor.getCDCLogLocation() + File.separator + segment.logFile.getName()); - else + if (delete) + FileUtils.deleteWithConfirm(segment.logFile); + + if (segment.getCDCState() != CDCState.CONTAINS) { - if (delete) - FileUtils.deleteWithConfirm(segment.logFile); + // Always delete hard-link from cdc folder if this segment didn't contain CDC data. Note: File may not exist + // if processing discard during startup. + File cdcLink = segment.getCDCFile(); + if (cdcLink.exists()) + FileUtils.deleteWithConfirm(cdcLink); + + File cdcIndexFile = segment.getCDCIndexFile(); + if (cdcIndexFile.exists()) + FileUtils.deleteWithConfirm(cdcIndexFile); } } @@ -89,10 +97,10 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager * @param mutation Mutation to allocate in segment manager * @param size total size (overhead + serialized) of mutation * @return the created Allocation object - * @throws WriteTimeoutException If segment disallows CDC mutations, we throw WTE + * @throws CDCWriteException If segment disallows CDC mutations, we throw */ @Override - public CommitLogSegment.Allocation allocate(Mutation mutation, int size) throws WriteTimeoutException + public CommitLogSegment.Allocation allocate(Mutation mutation, int size) throws CDCWriteException { CommitLogSegment segment = allocatingFrom(); CommitLogSegment.Allocation alloc; @@ -113,44 +121,46 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager return alloc; } - private void throwIfForbidden(Mutation mutation, CommitLogSegment segment) throws WriteTimeoutException + private void throwIfForbidden(Mutation mutation, CommitLogSegment segment) throws CDCWriteException { if (mutation.trackedByCDC() && segment.getCDCState() == CDCState.FORBIDDEN) { cdcSizeTracker.submitOverflowSizeRecalculation(); + String logMsg = String.format("Rejecting mutation to keyspace %s. Free up space in %s by processing CDC logs.", + mutation.getKeyspaceName(), DatabaseDescriptor.getCDCLogLocation()); NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 10, TimeUnit.SECONDS, - "Rejecting Mutation containing CDC-enabled table. Free up space in {}.", - DatabaseDescriptor.getCDCLogLocation()); - throw new WriteTimeoutException(WriteType.CDC, ConsistencyLevel.LOCAL_ONE, 0, 1); + logMsg); + throw new CDCWriteException(logMsg); } } /** - * Move files to cdc_raw after replay, since recovery will flush to SSTable and these mutations won't be available - * in the CL subsystem otherwise. - */ - void handleReplayedSegment(final File file) - { - logger.trace("Moving (Unopened) segment {} to cdc_raw directory after replay", file); - FileUtils.renameWithConfirm(file.getAbsolutePath(), DatabaseDescriptor.getCDCLogLocation() + File.separator + file.getName()); - cdcSizeTracker.addFlushedSize(file.length()); - } - - /** * On segment creation, flag whether the segment should accept CDC mutations or not based on the total currently * allocated unflushed CDC segments and the contents of cdc_raw */ public CommitLogSegment createSegment() { CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, this); + + // Hard link file in cdc folder for realtime tracking + FileUtils.createHardLink(segment.logFile, segment.getCDCFile()); + cdcSizeTracker.processNewSegment(segment); return segment; } /** + * For use after replay when replayer hard-links / adds tracking of replayed segments + */ + public void addCDCSize(long size) + { + cdcSizeTracker.addSize(size); + } + + /** * Tracks total disk usage of CDC subsystem, defined by the summation of all unflushed CommitLogSegments with CDC * data in them and all segments archived into cdc_raw. * @@ -162,7 +172,6 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager private final RateLimiter rateLimiter = RateLimiter.create(1000.0 / DatabaseDescriptor.getCDCDiskCheckInterval()); private ExecutorService cdcSizeCalculationExecutor; private CommitLogSegmentManagerCDC segmentManager; - private volatile long unflushedCDCSize; // Used instead of size during walk to remove chance of over-allocation private volatile long sizeInProgress = 0; @@ -179,7 +188,6 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager public void start() { size = 0; - unflushedCDCSize = 0; cdcSizeCalculationExecutor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy()); } @@ -202,7 +210,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager ? CDCState.FORBIDDEN : CDCState.PERMITTED); if (segment.getCDCState() == CDCState.PERMITTED) - unflushedCDCSize += defaultSegmentSize(); + size += defaultSegmentSize(); } // Take this opportunity to kick off a recalc to pick up any consumer file deletion. @@ -218,7 +226,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager if (segment.getCDCState() == CDCState.CONTAINS) size += segment.onDiskSize(); if (segment.getCDCState() != CDCState.FORBIDDEN) - unflushedCDCSize -= defaultSegmentSize(); + size -= defaultSegmentSize(); } // Take this opportunity to kick off a recalc to pick up any consumer file deletion. @@ -278,19 +286,20 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager return FileVisitResult.CONTINUE; } - private void addFlushedSize(long toAdd) + + public void shutdown() { - size += toAdd; + cdcSizeCalculationExecutor.shutdown(); } - private long totalCDCSizeOnDisk() + private void addSize(long toAdd) { - return unflushedCDCSize + size; + size += toAdd; } - public void shutdown() + private long totalCDCSizeOnDisk() { - cdcSizeCalculationExecutor.shutdown(); + return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java index 86e886b..b9bd744 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java @@ -61,19 +61,7 @@ public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentMan return alloc; } - /** - * Simply delete untracked segment files w/standard, as it'll be flushed to sstables during recovery - * - * @param file segment file that is no longer in use. - */ - void handleReplayedSegment(final File file) - { - // (don't decrease managed size, since this was never a "live" segment) - logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file); - FileUtils.deleteWithConfirm(file); - } - - public CommitLogSegment createSegment() + public CommitLogSegment createSegment() { return CommitLogSegment.createSegment(commitLog, this); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/exceptions/CDCWriteException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/CDCWriteException.java b/src/java/org/apache/cassandra/exceptions/CDCWriteException.java new file mode 100644 index 0000000..d60c1d3 --- /dev/null +++ b/src/java/org/apache/cassandra/exceptions/CDCWriteException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.exceptions; + +public class CDCWriteException extends RequestExecutionException +{ + public CDCWriteException(String msg) + { + super(ExceptionCode.CDC_WRITE_FAILURE, msg); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/exceptions/ExceptionCode.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java index 6ad0577..9324110 100644 --- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java +++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java @@ -33,15 +33,16 @@ public enum ExceptionCode BAD_CREDENTIALS (0x0100), // 1xx: problem during request execution - UNAVAILABLE (0x1000), - OVERLOADED (0x1001), - IS_BOOTSTRAPPING(0x1002), - TRUNCATE_ERROR (0x1003), - WRITE_TIMEOUT (0x1100), - READ_TIMEOUT (0x1200), - READ_FAILURE (0x1300), - FUNCTION_FAILURE(0x1400), - WRITE_FAILURE (0x1500), + UNAVAILABLE (0x1000), + OVERLOADED (0x1001), + IS_BOOTSTRAPPING (0x1002), + TRUNCATE_ERROR (0x1003), + WRITE_TIMEOUT (0x1100), + READ_TIMEOUT (0x1200), + READ_FAILURE (0x1300), + FUNCTION_FAILURE (0x1400), + WRITE_FAILURE (0x1500), + CDC_WRITE_FAILURE (0x1600), // 2xx: problem validating the request SYNTAX_ERROR (0x2000), http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index ac4b3dc..9163d56 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -151,6 +151,9 @@ public class ErrorMessage extends Message.Response case CONFIG_ERROR: te = new ConfigurationException(msg); break; + case CDC_WRITE_FAILURE: + te = new CDCWriteException(msg); + break; case ALREADY_EXISTS: String ksName = CBUtil.readString(body); String cfName = CBUtil.readString(body); @@ -306,6 +309,8 @@ public class ErrorMessage extends Message.Response return new WriteTimeoutException(wfe.writeType, wfe.consistency, wfe.received, wfe.blockFor); case FUNCTION_FAILURE: return new InvalidRequestException(msg.toString()); + case CDC_WRITE_FAILURE: + return new InvalidRequestException(msg.toString()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index 97a7e7a..ead2a88 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -9,6 +9,8 @@ commitlog_sync: batch commitlog_sync_batch_window_in_ms: 1.0 commitlog_segment_size_in_mb: 5 commitlog_directory: build/test/cassandra/commitlog +# commitlog_compression: +# - class_name: LZ4Compressor cdc_raw_directory: build/test/cassandra/cdc_raw cdc_enabled: false hints_directory: build/test/cassandra/hints http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/test/conf/cdc.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cdc.yaml b/test/conf/cdc.yaml index f79930a..8fb9427 100644 --- a/test/conf/cdc.yaml +++ b/test/conf/cdc.yaml @@ -1 +1,4 @@ cdc_enabled: true +# Compression enabled since uncompressed + cdc isn't compatible w/Windows +commitlog_compression: + - class_name: LZ4Compressor http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java new file mode 100644 index 0000000..3695da8 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.RebufferingInputStream; + +/** + * Utility class that flags the replayer as having seen a CDC mutation and calculates offset but doesn't apply mutations + */ +public class CDCTestReplayer extends CommitLogReplayer +{ + private static final Logger logger = LoggerFactory.getLogger(CDCTestReplayer.class); + + public CDCTestReplayer() throws IOException + { + super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create()); + CommitLog.instance.sync(); + commitLogReader = new CommitLogTestReader(); + } + + public void examineCommitLog() throws IOException + { + replayFiles(new File(DatabaseDescriptor.getCommitLogLocation()).listFiles()); + } + + private class CommitLogTestReader extends CommitLogReader + { + @Override + protected void readMutation(CommitLogReadHandler handler, + byte[] inputBuffer, + int size, + CommitLogPosition minPosition, + final int entryLocation, + final CommitLogDescriptor desc) throws IOException + { + RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size); + Mutation mutation; + try + { + mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL); + if (mutation.trackedByCDC()) + sawCDCMutation = true; + } + catch (IOException e) + { + // Test fails. + throw new AssertionError(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java index 3ae1ae4..80dfd01 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java @@ -18,10 +18,11 @@ package org.apache.cassandra.db.commitlog; -import java.io.File; -import java.io.IOException; +import java.io.*; import java.nio.ByteBuffer; -import java.util.Random; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; import org.junit.Assert; import org.junit.Assume; @@ -29,18 +30,18 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.commitlog.CommitLogSegment.CDCState; -import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.exceptions.CDCWriteException; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.schema.TableMetadata; public class CommitLogSegmentManagerCDCTest extends CQLTester { - private static Random random = new Random(); + private static final Random random = new Random(); @BeforeClass public static void checkConfig() @@ -49,15 +50,17 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester } @Before - public void before() throws IOException + public void beforeTest() throws Throwable { - CommitLog.instance.resetUnsafe(true); - for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles()) - FileUtils.deleteWithConfirm(f); + super.beforeTest(); + // Need to clean out any files from previous test runs. Prevents flaky test failures. + CommitLog.instance.stopUnsafe(true); + CommitLog.instance.start(); + ((CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager).updateCDCTotalSize(); } @Test - public void testCDCWriteTimeout() throws Throwable + public void testCDCWriteFailure() throws Throwable { createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;"); CommitLogSegmentManagerCDC cdcMgr = (CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager; @@ -68,7 +71,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester try { DatabaseDescriptor.setCDCSpaceInMB(32); - // Spin until we hit CDC capacity and make sure we get a WriteTimeout + // Spin until we hit CDC capacity and make sure we get a CDCWriteException try { // Should trigger on anything < 20:1 compression ratio during compressed test @@ -78,9 +81,9 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) .build().apply(); } - Assert.fail("Expected WriteTimeoutException from full CDC but did not receive it."); + Assert.fail("Expected CDCWriteException from full CDC but did not receive it."); } - catch (WriteTimeoutException e) + catch (CDCWriteException e) { // expected, do nothing } @@ -111,45 +114,6 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester } @Test - public void testCLSMCDCDiscardLogic() throws Throwable - { - CommitLogSegmentManagerCDC cdcMgr = (CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager; - - createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=false;"); - for (int i = 0; i < 8; i++) - { - new RowUpdateBuilder(currentTableMetadata(), 0, i) - .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 4)) // fit 3 in a segment - .build().apply(); - } - - // Should have 4 segments CDC since we haven't flushed yet, 3 PERMITTED, one of which is active, and 1 PERMITTED, in waiting - Assert.assertEquals(4 * DatabaseDescriptor.getCommitLogSegmentSize(), cdcMgr.updateCDCTotalSize()); - expectCurrentCDCState(CDCState.PERMITTED); - CommitLog.instance.forceRecycleAllSegments(); - - // on flush, these PERMITTED should be deleted - Assert.assertEquals(0, new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length); - - createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;"); - for (int i = 0; i < 8; i++) - { - new RowUpdateBuilder(currentTableMetadata(), 0, i) - .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 4)) - .build().apply(); - } - // 4 total again, 3 CONTAINS, 1 in waiting PERMITTED - Assert.assertEquals(4 * DatabaseDescriptor.getCommitLogSegmentSize(), cdcMgr.updateCDCTotalSize()); - CommitLog.instance.forceRecycleAllSegments(); - expectCurrentCDCState(CDCState.PERMITTED); - - // On flush, PERMITTED is deleted, CONTAINS is preserved. - cdcMgr.awaitManagementTasksCompletion(); - int seen = getCDCRawCount(); - Assert.assertTrue("Expected >3 files in cdc_raw, saw: " + seen, seen >= 3); - } - - @Test public void testSegmentFlaggingOnCreation() throws Throwable { CommitLogSegmentManagerCDC cdcMgr = (CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager; @@ -160,7 +124,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester { DatabaseDescriptor.setCDCSpaceInMB(16); TableMetadata ccfm = Keyspace.open(keyspace()).getColumnFamilyStore(ct).metadata(); - // Spin until we hit CDC capacity and make sure we get a WriteTimeout + // Spin until we hit CDC capacity and make sure we get a CDCWriteException try { for (int i = 0; i < 1000; i++) @@ -169,15 +133,17 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) .build().apply(); } - Assert.fail("Expected WriteTimeoutException from full CDC but did not receive it."); + Assert.fail("Expected CDCWriteException from full CDC but did not receive it."); } - catch (WriteTimeoutException e) { } + catch (CDCWriteException e) { } expectCurrentCDCState(CDCState.FORBIDDEN); CommitLog.instance.forceRecycleAllSegments(); cdcMgr.awaitManagementTasksCompletion(); - new File(DatabaseDescriptor.getCDCLogLocation()).listFiles()[0].delete(); + // Delete all files in cdc_raw + for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles()) + f.delete(); cdcMgr.updateCDCTotalSize(); // Confirm cdc update process changes flag on active segment expectCurrentCDCState(CDCState.PERMITTED); @@ -186,12 +152,6 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles()) { FileUtils.deleteWithConfirm(f); } - - // Set space to 0, confirm newly allocated segments are FORBIDDEN - DatabaseDescriptor.setCDCSpaceInMB(0); - CommitLog.instance.forceRecycleAllSegments(); - CommitLog.instance.segmentManager.awaitManagementTasksCompletion(); - expectCurrentCDCState(CDCState.FORBIDDEN); } finally { @@ -199,6 +159,259 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester } } + @Test + public void testCDCIndexFileWriteOnSync() throws IOException + { + createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;"); + new RowUpdateBuilder(currentTableMetadata(), 0, 1) + .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) + .build().apply(); + + CommitLog.instance.sync(); + CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom(); + int syncOffset = currentSegment.lastSyncedOffset; + + // Confirm index file is written + File cdcIndexFile = currentSegment.getCDCIndexFile(); + Assert.assertTrue("Index file not written: " + cdcIndexFile, cdcIndexFile.exists()); + + // Read index value and confirm it's == end from last sync + BufferedReader in = new BufferedReader(new FileReader(cdcIndexFile)); + String input = in.readLine(); + Integer offset = Integer.parseInt(input); + Assert.assertEquals(syncOffset, (long)offset); + in.close(); + } + + @Test + public void testCompletedFlag() throws IOException + { + createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;"); + CommitLogSegment initialSegment = CommitLog.instance.segmentManager.allocatingFrom(); + DatabaseDescriptor.setCDCSpaceInMB(8); + try + { + for (int i = 0; i < 1000; i++) + { + new RowUpdateBuilder(currentTableMetadata(), 0, 1) + .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) + .build().apply(); + } + } + catch (CDCWriteException ce) + { + // pass. Expected since we'll have a file or two linked on restart of CommitLog due to replay + } + + CommitLog.instance.forceRecycleAllSegments(); + + // Confirm index file is written + File cdcIndexFile = initialSegment.getCDCIndexFile(); + Assert.assertTrue("Index file not written: " + cdcIndexFile, cdcIndexFile.exists()); + + // Read index file and confirm second line is COMPLETED + BufferedReader in = new BufferedReader(new FileReader(cdcIndexFile)); + String input = in.readLine(); + input = in.readLine(); + Assert.assertTrue("Expected COMPLETED in index file, got: " + input, input.equals("COMPLETED")); + in.close(); + } + + @Test + public void testDeleteLinkOnDiscardNoCDC() throws Throwable + { + createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=false;"); + new RowUpdateBuilder(currentTableMetadata(), 0, 1) + .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) + .build().apply(); + CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom(); + + // Confirm that, with no CDC data present, we've hard-linked but have no index file + Path linked = new File(DatabaseDescriptor.getCDCLogLocation(), currentSegment.logFile.getName()).toPath(); + File cdcIndexFile = currentSegment.getCDCIndexFile(); + Assert.assertTrue("File does not exist: " + linked, Files.exists(linked)); + Assert.assertFalse("Expected index file to not be created but found: " + cdcIndexFile, cdcIndexFile.exists()); + + // Sync and confirm no index written as index is written on flush + CommitLog.instance.sync(); + Assert.assertTrue("File does not exist: " + linked, Files.exists(linked)); + Assert.assertFalse("Expected index file to not be created but found: " + cdcIndexFile, cdcIndexFile.exists()); + + // Force a full recycle and confirm hard-link is deleted + CommitLog.instance.forceRecycleAllSegments(); + CommitLog.instance.segmentManager.awaitManagementTasksCompletion(); + Assert.assertFalse("Expected hard link to CLS to be deleted on non-cdc segment: " + linked, Files.exists(linked)); + } + + @Test + public void testRetainLinkOnDiscardCDC() throws Throwable + { + createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;"); + CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom(); + File cdcIndexFile = currentSegment.getCDCIndexFile(); + Assert.assertFalse("Expected no index file before flush but found: " + cdcIndexFile, cdcIndexFile.exists()); + + new RowUpdateBuilder(currentTableMetadata(), 0, 1) + .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) + .build().apply(); + + Path linked = new File(DatabaseDescriptor.getCDCLogLocation(), currentSegment.logFile.getName()).toPath(); + // Confirm that, with CDC data present but not yet flushed, we've hard-linked but have no index file + Assert.assertTrue("File does not exist: " + linked, Files.exists(linked)); + + // Sync and confirm index written as index is written on flush + CommitLog.instance.sync(); + Assert.assertTrue("File does not exist: " + linked, Files.exists(linked)); + Assert.assertTrue("Expected cdc index file after flush but found none: " + cdcIndexFile, cdcIndexFile.exists()); + + // Force a full recycle and confirm all files remain + CommitLog.instance.forceRecycleAllSegments(); + Assert.assertTrue("File does not exist: " + linked, Files.exists(linked)); + Assert.assertTrue("Expected cdc index file after recycle but found none: " + cdcIndexFile, cdcIndexFile.exists()); + } + + @Test + public void testReplayLogic() throws IOException + { + // Assert.assertEquals(0, new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length); + String table_name = createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;"); + + DatabaseDescriptor.setCDCSpaceInMB(8); + TableMetadata ccfm = Keyspace.open(keyspace()).getColumnFamilyStore(table_name).metadata(); + try + { + for (int i = 0; i < 1000; i++) + { + new RowUpdateBuilder(ccfm, 0, i) + .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) + .build().apply(); + } + Assert.fail("Expected CDCWriteException from full CDC but did not receive it."); + } + catch (CDCWriteException e) + { + // pass + } + + CommitLog.instance.sync(); + CommitLog.instance.stopUnsafe(false); + + // Build up a list of expected index files after replay and then clear out cdc_raw + List<CDCIndexData> oldData = parseCDCIndexData(); + for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles()) + FileUtils.deleteWithConfirm(f.getAbsolutePath()); + + try + { + Assert.assertEquals("Expected 0 files in CDC folder after deletion. ", + 0, new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length); + } + finally + { + // If we don't have a started commitlog, assertions will cause the test to hang. I assume it's some assumption + // hang in the shutdown on CQLTester trying to clean up / drop keyspaces / tables and hanging applying + // mutations. + CommitLog.instance.start(); + CommitLog.instance.segmentManager.awaitManagementTasksCompletion(); + } + CDCTestReplayer replayer = new CDCTestReplayer(); + replayer.examineCommitLog(); + + // Rough sanity check -> should be files there now. + Assert.assertTrue("Expected non-zero number of files in CDC folder after restart.", + new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length > 0); + + // Confirm all the old indexes in old are present and >= the original offset, as we flag the entire segment + // as cdc written on a replay. + List<CDCIndexData> newData = parseCDCIndexData(); + for (CDCIndexData cid : oldData) + { + boolean found = false; + for (CDCIndexData ncid : newData) + { + if (cid.fileName.equals(ncid.fileName)) + { + Assert.assertTrue("New CDC index file expected to have >= offset in old.", ncid.offset >= cid.offset); + found = true; + } + } + if (!found) + { + StringBuilder errorMessage = new StringBuilder(); + errorMessage.append(String.format("Missing old CDCIndexData in new set after replay: %s\n", cid)); + errorMessage.append("List of CDCIndexData in new set of indexes after replay:\n"); + for (CDCIndexData ncid : newData) + errorMessage.append(String.format(" %s\n", ncid)); + Assert.fail(errorMessage.toString()); + } + } + + // And make sure we don't have new CDC Indexes we don't expect + for (CDCIndexData ncid : newData) + { + boolean found = false; + for (CDCIndexData cid : oldData) + { + if (cid.fileName.equals(ncid.fileName)) + found = true; + } + if (!found) + Assert.fail(String.format("Unexpected new CDCIndexData found after replay: %s\n", ncid)); + } + } + + private List<CDCIndexData> parseCDCIndexData() + { + List<CDCIndexData> results = new ArrayList<>(); + try + { + for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles()) + { + if (f.getName().contains("_cdc.idx")) + results.add(new CDCIndexData(f)); + } + } + catch (IOException e) + { + Assert.fail(String.format("Failed to parse CDCIndexData: %s", e.getMessage())); + } + return results; + } + + private static class CDCIndexData + { + private final String fileName; + private final int offset; + + CDCIndexData(File f) throws IOException + { + String line = ""; + try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(f)))) + { + line = br.readLine(); + } + catch (Exception e) + { + throw e; + } + fileName = f.getName(); + offset = Integer.parseInt(line); + } + + @Override + public String toString() + { + return String.format("%s,%d", fileName, offset); + } + + @Override + public boolean equals(Object other) + { + CDCIndexData cid = (CDCIndexData)other; + return fileName.equals(cid.fileName) && offset == cid.offset; + } + } + private ByteBuffer randomizeBuffer(int size) { byte[] toWrap = new byte[size]; @@ -211,9 +424,15 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester return new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length; } - private void expectCurrentCDCState(CDCState state) + private void expectCurrentCDCState(CDCState expectedState) { - Assert.assertEquals("Received unexpected CDCState on current allocatingFrom segment.", - state, CommitLog.instance.segmentManager.allocatingFrom().getCDCState()); + CDCState currentState = CommitLog.instance.segmentManager.allocatingFrom().getCDCState(); + if (currentState != expectedState) + { + logger.error("expectCurrentCDCState violation! Expected state: {}. Found state: {}. Current CDC allocation: {}", + expectedState, currentState, ((CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager).updateCDCTotalSize()); + Assert.fail(String.format("Received unexpected CDCState on current allocatingFrom segment. Expected: %s. Received: %s", + expectedState, currentState)); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org