This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cep-7-sai in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 00ac4a75d6f0fe924169ad07082a2dd0f0741efa Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Fri Jul 14 01:44:26 2023 -0700 Importer should build SSTable indexes successfully before making new SSTables readable - Avoid validation in response to SSTableAddedNotification, as it should already have been done somewhere else - Change SSTableWriter to prevent commit when a failure is thrown out of an index build patch by Caleb Rackliffe; reviewed by Mike Adamson and Andres de la Peña for CASSANDRA-18670 --- .../cassandra/config/DatabaseDescriptor.java | 6 + .../org/apache/cassandra/db/SSTableImporter.java | 5 + .../db/streaming/CassandraIncomingFile.java | 8 + .../db/streaming/CassandraStreamReceiver.java | 16 +- src/java/org/apache/cassandra/index/Index.java | 38 ++- .../org/apache/cassandra/index/IndexRegistry.java | 8 +- .../cassandra/index/SecondaryIndexManager.java | 111 +++++++- .../apache/cassandra/index/sai/IndexContext.java | 1 - .../cassandra/index/sai/SSTableContextManager.java | 1 - .../cassandra/index/sai/StorageAttachedIndex.java | 6 + .../index/sai/StorageAttachedIndexBuilder.java | 8 +- .../index/sai/StorageAttachedIndexGroup.java | 42 ++- .../index/sai/disk/StorageAttachedIndexWriter.java | 24 +- .../index/sai/disk/format/IndexDescriptor.java | 42 ++- .../index/sai/disk/format/OnDiskFormat.java | 11 +- .../index/sai/disk/v1/V1OnDiskFormat.java | 51 ++-- .../org/apache/cassandra/index/sasi/SASIIndex.java | 1 + .../cassandra/io/sstable/format/SSTableWriter.java | 11 +- .../test/sai/ImportIndexedSSTablesTest.java | 283 +++++++++++++++++++++ .../test/sai/IndexStreamingFailureTest.java | 199 +++++++++++++++ .../org/apache/cassandra/index/sai/SAITester.java | 2 +- .../index/sai/cql/StorageAttachedIndexDDLTest.java | 31 ++- .../sai/metrics/SegmentFlushingFailureTester.java | 34 ++- 23 files changed, 826 insertions(+), 113 deletions(-) diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index f288baefbf..4e5a26fafc 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -3552,6 +3552,12 @@ public class DatabaseDescriptor return conf.stream_entire_sstables; } + @VisibleForTesting + public static boolean setStreamEntireSSTables(boolean value) + { + return conf.stream_entire_sstables = value; + } + public static DurationSpec.LongMillisecondsBound getStreamTransferTaskTimeout() { return conf.stream_transfer_task_timeout; diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java b/src/java/org/apache/cassandra/db/SSTableImporter.java index b7cf3b7718..c2544f7d06 100644 --- a/src/java/org/apache/cassandra/db/SSTableImporter.java +++ b/src/java/org/apache/cassandra/db/SSTableImporter.java @@ -186,6 +186,11 @@ public class SSTableImporter try (Refs<SSTableReader> refs = Refs.ref(newSSTables)) { abortIfDraining(); + + // Validate existing SSTable-attached indexes, and then build any that are missing: + if (!cfs.indexManager.validateSSTableAttachedIndexes(newSSTables, false)) + cfs.indexManager.buildSSTableAttachedIndexesBlocking(newSSTables); + cfs.getTracker().addSSTables(newSSTables); for (SSTableReader reader : newSSTables) { diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java index 3e87d67fef..e8a6fbcc7c 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java @@ -47,6 +47,8 @@ public class CassandraIncomingFile implements IncomingStream private volatile long size = -1; private volatile int numFiles = 1; + private volatile boolean isEntireSSTable = false; + private static final Logger logger = LoggerFactory.getLogger(CassandraIncomingFile.class); public CassandraIncomingFile(ColumnFamilyStore cfs, StreamSession session, StreamMessageHeader header) @@ -72,6 +74,7 @@ public class CassandraIncomingFile implements IncomingStream IStreamReader reader; if (streamHeader.isEntireSSTable) { + isEntireSSTable = true; reader = new CassandraEntireSSTableStreamReader(header, streamHeader, session); numFiles = streamHeader.componentManifest.components().size(); } @@ -103,6 +106,11 @@ public class CassandraIncomingFile implements IncomingStream return numFiles; } + public boolean isEntireSSTable() + { + return isEntireSSTable; + } + @Override public TableId getTableId() { diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java index 4b5de8b8c9..5721d0b2e4 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java @@ -68,7 +68,9 @@ public class CassandraStreamReceiver implements StreamReceiver private final LifecycleTransaction txn; // holds references to SSTables received - protected Collection<SSTableReader> sstables; + protected final Collection<SSTableReader> sstables; + + protected volatile boolean receivedEntireSSTable; private final boolean requiresWritePath; @@ -114,6 +116,7 @@ public class CassandraStreamReceiver implements StreamReceiver } txn.update(finished, false); sstables.addAll(finished); + receivedEntireSSTable = file.isEntireSSTable(); } @Override @@ -223,7 +226,7 @@ public class CassandraStreamReceiver implements StreamReceiver } } - public synchronized void finishTransaction() + public synchronized void finishTransaction() { txn.finish(); } @@ -242,9 +245,16 @@ public class CassandraStreamReceiver implements StreamReceiver } else { + // Validate SSTable-attached indexes that should have streamed in an already complete state. When we + // don't stream the entire SSTable, validation is unnecessary, as the indexes have just been written + // via the SSTable flush observer, and an error there would have aborted the streaming transaction. + if (receivedEntireSSTable) + // If we do validate, any exception thrown doing so will also abort the streaming transaction: + cfs.indexManager.validateSSTableAttachedIndexes(readers, true); + finishTransaction(); - // add sstables (this will build secondary indexes too, see CASSANDRA-10130) + // add sstables (this will build non-SSTable-attached secondary indexes too, see CASSANDRA-10130) logger.debug("[Stream #{}] Received {} sstables from {} ({})", session.planId(), readers.size(), session.peer, readers); cfs.addSSTables(readers); diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java index 5ca9e719a2..f116fdb3e0 100644 --- a/src/java/org/apache/cassandra/index/Index.java +++ b/src/java/org/apache/cassandra/index/Index.java @@ -20,6 +20,7 @@ */ package org.apache.cassandra.index; +import java.io.UncheckedIOException; import java.util.Collection; import java.util.Collections; import java.util.Optional; @@ -61,7 +62,6 @@ import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.TableMetadata; - /** * Consisting of a top level Index interface and two sub-interfaces which handle read and write operations, * Searcher and Indexer respectively, this defines a secondary index implementation. @@ -341,13 +341,28 @@ public interface Index * Return true if this index can be built or rebuilt when the index manager determines it is necessary. Returning * false enables the index implementation (or some other component) to control if and when SSTable data is * incorporated into the index. - * + * <p> * This is called by SecondaryIndexManager in buildIndexBlocking, buildAllIndexesBlocking and rebuildIndexesBlocking * where a return value of false causes the index to be exluded from the set of those which will process the * SSTable data. * @return if the index should be included in the set which processes SSTable data, false otherwise. */ - public boolean shouldBuildBlocking(); + boolean shouldBuildBlocking(); + + /** + * For an index to qualify as SSTable-attached, it must do two things: + * <p> + * 1.) It must use {@link SSTableFlushObserver} to incrementally build indexes as SSTables are written. This ensures + * that non-entire file streaming builds them correctly before the streaming transaction finishes. + * <p> + * 2.) Its implementation of {@link SecondaryIndexBuilder} must support incremental building by SSTable. + * + * @return true if the index builds SSTable-attached on-disk components + */ + default boolean isSSTableAttached() + { + return false; + } /** * Get flush observer to observe partition/cell events generated by flushing SSTable (memtable flush or compaction). @@ -765,6 +780,23 @@ public interface Index * @return the SSTable components created by this group */ Set<Component> getComponents(); + + /** + * Validates all indexes in the group against the specified SSTables. + * + * @param sstables SSTables for which indexes in the group should be built + * @param throwOnIncomplete whether to throw an error if any index in the group is incomplete + * + * @return true if all indexes in the group are complete and valid + * false if any index is incomplete and {@code throwOnIncomplete} is false + * + * @throws IllegalStateException if {@code throwOnIncomplete} is true and any index in the group is incomplete + * @throws UncheckedIOException if there is a problem validating any on-disk component of an index in the group + */ + default boolean validateSSTableAttachedIndexes(Collection<SSTableReader> sstables, boolean throwOnIncomplete) + { + return true; + } } /** diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java b/src/java/org/apache/cassandra/index/IndexRegistry.java index 4d49c693d6..308aeacd7a 100644 --- a/src/java/org/apache/cassandra/index/IndexRegistry.java +++ b/src/java/org/apache/cassandra/index/IndexRegistry.java @@ -61,7 +61,7 @@ public interface IndexRegistry /** * An empty {@code IndexRegistry} */ - public static final IndexRegistry EMPTY = new IndexRegistry() + IndexRegistry EMPTY = new IndexRegistry() { @Override public void registerIndex(Index index, Object groupKey, Supplier<Index.Group> groupSupplier) @@ -104,7 +104,7 @@ public interface IndexRegistry * but enables query validation and preparation to succeed. Useful for tools which need to prepare * CQL statements without instantiating the whole ColumnFamilyStore infrastructure. */ - public static final IndexRegistry NON_DAEMON = new IndexRegistry() + IndexRegistry NON_DAEMON = new IndexRegistry() { final Index index = new Index() { @@ -279,7 +279,7 @@ public interface IndexRegistry { registerIndex(index, index, () -> new SingletonIndexGroup(index)); } - public void registerIndex(Index index, Object groupKey, Supplier<Index.Group> groupSupplier); + void registerIndex(Index index, Object groupKey, Supplier<Index.Group> groupSupplier); Collection<Index.Group> listIndexGroups(); Index getIndex(IndexMetadata indexMetadata); @@ -304,7 +304,7 @@ public interface IndexRegistry * @param table the table metadata * @return the {@code IndexRegistry} associated to the specified table */ - public static IndexRegistry obtain(TableMetadata table) + static IndexRegistry obtain(TableMetadata table) { if (!DatabaseDescriptor.isDaemonInitialized()) return NON_DAEMON; diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 08c1a5fa6d..19985c304f 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.index; +import java.io.UncheckedIOException; import java.lang.reflect.Constructor; import java.util.*; import java.util.concurrent.Callable; @@ -63,7 +64,10 @@ import org.apache.cassandra.db.rows.*; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.index.Index.IndexBuildingSupport; import org.apache.cassandra.index.internal.CassandraIndex; -import org.apache.cassandra.index.transactions.*; +import org.apache.cassandra.index.transactions.CleanupTransaction; +import org.apache.cassandra.index.transactions.CompactionTransaction; +import org.apache.cassandra.index.transactions.IndexTransaction; +import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; @@ -492,8 +496,93 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum } /** - * Performs a blocking (re)indexing/recovery of the specified SSTables for the specified indexes. + * Validates all index groups against the specified SSTables. + * + * @param sstables SSTables for which indexes in the group should be built + * @param throwOnIncomplete whether to throw an error if any index in the group is incomplete + * + * @return true if all indexes in all groups are complete and valid + * false if an index in any group is incomplete and {@code throwOnIncomplete} is false * + * @throws IllegalStateException if {@code throwOnIncomplete} is true and an index in any group is incomplete + * @throws UncheckedIOException if there is a problem validating any on-disk component in any group + */ + public boolean validateSSTableAttachedIndexes(Collection<SSTableReader> sstables, boolean throwOnIncomplete) + { + boolean complete = true; + + for (Index.Group group : indexGroups.values()) + { + if (group.getIndexes().stream().anyMatch(Index::isSSTableAttached)) + complete &= group.validateSSTableAttachedIndexes(sstables, throwOnIncomplete); + } + + return complete; + } + + /** + * Incrementally builds indexes for the specified SSTables in a blocking fashion. + * <p> + * This is similar to {@link #buildIndexesBlocking}, but it is designed to be used in cases where failure will + * cascade through to failing the containing operation that actuates the build. (ex. streaming and SSTable import) + * <p> + * It does not update index build status or queryablility on failure or success and does not call + * {@link #flushIndexesBlocking(Set, FutureCallback)}, as this is an artifact of the legacy non-SSTable-attached + * index implementation. + * + * @param sstables the SSTables for which indexes must be built + */ + public void buildSSTableAttachedIndexesBlocking(Collection<SSTableReader> sstables) + { + Set<Index> toBuild = indexes.values().stream().filter(Index::isSSTableAttached).collect(Collectors.toSet()); + + if (toBuild.isEmpty()) + return; + + logger.info("Submitting incremental index build of {} for data in {}...", + commaSeparated(toBuild), + sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(","))); + + // Group all building tasks + Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>(); + for (Index index : toBuild) + { + Set<Index> stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>()); + stored.add(index); + } + + // Schedule all index building tasks with callbacks to handle success and failure + List<Future<?>> futures = new ArrayList<>(byType.size()); + byType.forEach((buildingSupport, groupedIndexes) -> + { + SecondaryIndexBuilder builder = buildingSupport.getIndexBuildTask(baseCfs, groupedIndexes, sstables, false); + AsyncPromise<Object> build = new AsyncPromise<>(); + CompactionManager.instance.submitIndexBuild(builder).addCallback(new FutureCallback<Object>() + { + @Override + public void onFailure(Throwable t) + { + logger.warn("Failed to incrementally build indexes {}", getIndexNames(groupedIndexes)); + build.tryFailure(t); + } + + @Override + public void onSuccess(Object o) + { + logger.info("Incremental index build of {} completed", getIndexNames(groupedIndexes)); + build.trySuccess(o); + } + }); + futures.add(build); + }); + + // Finally wait for the index builds to finish + FBUtilities.waitOnFutures(futures); + } + + /** + * Performs a blocking (re)indexing/recovery of the specified SSTables for the specified indexes. + * <p> * If the index doesn't support ALL {@link Index.LoadType} it performs a recovery {@link Index#getRecoveryTaskSupport()} * instead of a build {@link Index#getBuildTaskSupport()} * @@ -501,7 +590,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum * @param indexes the indexes to be (re)built for the specifed SSTables * @param isFullRebuild True if this method is invoked as a full index rebuild, false otherwise */ - @SuppressWarnings({ "unchecked" }) + @SuppressWarnings({"unchecked", "RedundantSuppression"}) private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes, boolean isFullRebuild) { if (indexes.isEmpty()) @@ -542,7 +631,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum { SecondaryIndexBuilder builder = buildingSupport.getIndexBuildTask(baseCfs, groupedIndexes, sstables, isFullRebuild); final AsyncPromise<Object> build = new AsyncPromise<>(); - CompactionManager.instance.submitIndexBuild(builder).addCallback(new FutureCallback() + CompactionManager.instance.submitIndexBuild(builder).addCallback(new FutureCallback<Object>() { @Override public void onFailure(Throwable t) @@ -584,11 +673,11 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum } // Flush all built indexes with an aynchronous callback to log the success or failure of the flush - flushIndexesBlocking(builtIndexes, new FutureCallback() + flushIndexesBlocking(builtIndexes, new FutureCallback<>() { - String indexNames = StringUtils.join(builtIndexes.stream() - .map(i -> i.getIndexMetadata().name) - .collect(Collectors.toList()), ','); + final String indexNames = StringUtils.join(builtIndexes.stream() + .map(i -> i.getIndexMetadata().name) + .collect(Collectors.toList()), ','); @Override public void onFailure(Throwable ignored) @@ -886,7 +975,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum { executeAllBlocking(indexes.values() .stream() - .filter(index -> !index.getBackingTable().isPresent()), + .filter(index -> index.getBackingTable().isEmpty()), index -> index.getBlockingFlushTask(baseCfsMemtable), null); } @@ -1203,7 +1292,6 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum * implementations * * @param update PartitionUpdate containing the values to be validated by registered Index implementations - * @throws InvalidRequestException */ public void validate(PartitionUpdate update) throws InvalidRequestException { @@ -1701,11 +1789,12 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum SSTableAddedNotification notice = (SSTableAddedNotification) notification; // SSTables asociated to a memtable come from a flush, so their contents have already been indexed - if (!notice.memtable().isPresent()) + if (notice.memtable().isEmpty()) buildIndexesBlocking(Lists.newArrayList(notice.added), indexes.values() .stream() .filter(Index::shouldBuildBlocking) + .filter(i -> !i.isSSTableAttached()) .collect(Collectors.toSet()), false); } diff --git a/src/java/org/apache/cassandra/index/sai/IndexContext.java b/src/java/org/apache/cassandra/index/sai/IndexContext.java index d0a5e08b16..ac33c837da 100644 --- a/src/java/org/apache/cassandra/index/sai/IndexContext.java +++ b/src/java/org/apache/cassandra/index/sai/IndexContext.java @@ -422,7 +422,6 @@ public class IndexContext { if (!sstableContext.indexDescriptor.validatePerIndexComponents(this, validation)) { - logger.warn(logMessage("Invalid per-column component for SSTable {}"), sstableContext.descriptor()); invalid.add(sstableContext); continue; } diff --git a/src/java/org/apache/cassandra/index/sai/SSTableContextManager.java b/src/java/org/apache/cassandra/index/sai/SSTableContextManager.java index 85192f525e..1c6406e0c1 100644 --- a/src/java/org/apache/cassandra/index/sai/SSTableContextManager.java +++ b/src/java/org/apache/cassandra/index/sai/SSTableContextManager.java @@ -79,7 +79,6 @@ public class SSTableContextManager // Only validate on restart or newly refreshed SSTable. Newly built files are unlikely to be corrupted. if (!sstableContexts.containsKey(sstable) && !indexDescriptor.validatePerSSTableComponents(validation)) { - logger.warn(indexDescriptor.logMessage("Invalid per-SSTable component for SSTable {}"), sstable.descriptor); invalid.add(sstable); removeInvalidSSTableContext(sstable); continue; diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java index 5a8f7b6a58..e8c4b53f4e 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java @@ -469,6 +469,12 @@ public class StorageAttachedIndex implements Index return true; } + @Override + public boolean isSSTableAttached() + { + return true; + } + @Override public Optional<ColumnFamilyStore> getBackingTable() { diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java index 76240c6094..c45b604ffc 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java @@ -99,7 +99,10 @@ public class StorageAttachedIndexBuilder extends SecondaryIndexBuilder @Override public void build() { - logger.debug(logMessage("Starting full index build")); + logger.debug(logMessage(String.format("Starting %s %s index build...", + isInitialBuild ? "initial" : "non-initial", + isFullRebuild ? "full" : "partial"))); + for (Map.Entry<SSTableReader, Set<StorageAttachedIndex>> e : sstables.entrySet()) { SSTableReader sstable = e.getKey(); @@ -113,9 +116,7 @@ public class StorageAttachedIndexBuilder extends SecondaryIndexBuilder } if (indexSSTable(sstable, existing)) - { return; - } } } @@ -205,7 +206,6 @@ public class StorageAttachedIndexBuilder extends SecondaryIndexBuilder if (t instanceof InterruptedException) { - // TODO: Is there anything that makes more sense than just restoring the interrupt? logger.warn(logMessage("Interrupted while building indexes {} on SSTable {}"), indexes, sstable.descriptor); Thread.currentThread().interrupt(); return true; diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java index 7dec78c4da..fb59244a78 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java @@ -249,9 +249,9 @@ public class StorageAttachedIndexGroup implements Index.Group, INotificationCons { SSTableAddedNotification notice = (SSTableAddedNotification) notification; - // Avoid validation for index files just written following Memtable flush. - IndexValidation validate = notice.memtable().isPresent() ? IndexValidation.NONE : IndexValidation.CHECKSUM; - onSSTableChanged(Collections.emptySet(), notice.added, indexes, validate); + // Avoid validation for index files just written following Memtable flush. Otherwise, the new SSTables have + // come either from import, streaming, or a standalone tool, where they have also already been validated. + onSSTableChanged(Collections.emptySet(), notice.added, indexes, IndexValidation.NONE); } else if (notification instanceof SSTableListChangedNotification) { @@ -335,6 +335,42 @@ public class StorageAttachedIndexGroup implements Index.Group, INotificationCons return incomplete; } + @Override + public boolean validateSSTableAttachedIndexes(Collection<SSTableReader> sstables, boolean throwOnIncomplete) + { + boolean complete = true; + + for (SSTableReader sstable : sstables) + { + IndexDescriptor indexDescriptor = IndexDescriptor.create(sstable); + + if (indexDescriptor.isPerSSTableIndexBuildComplete()) + { + indexDescriptor.checksumPerSSTableComponents(); + + for (StorageAttachedIndex index : indexes) + { + if (indexDescriptor.isPerColumnIndexBuildComplete(index.getIndexContext())) + indexDescriptor.checksumPerIndexComponents(index.getIndexContext()); + else if (throwOnIncomplete) + throw new IllegalStateException(indexDescriptor.logMessage("Incomplete per-column index build")); + else + complete = false; + } + } + else if (throwOnIncomplete) + { + throw new IllegalStateException(indexDescriptor.logMessage("Incomplete per-SSTable index build")); + } + else + { + complete = false; + } + } + + return complete; + } + /** * open index files by checking number of {@link SSTableContext} and {@link SSTableIndex}, * so transient open files during validation and files that are still open for in-flight requests will not be tracked. diff --git a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java index f315e0dbba..a1340c3507 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java @@ -48,13 +48,10 @@ public class StorageAttachedIndexWriter implements SSTableFlushObserver private static final Logger logger = LoggerFactory.getLogger(StorageAttachedIndexWriter.class); private final IndexDescriptor indexDescriptor; - private final Collection<StorageAttachedIndex> indexes; private final Collection<PerColumnIndexWriter> perIndexWriters; private final PerSSTableIndexWriter perSSTableWriter; private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final RowMapping rowMapping; - private final boolean propagateErrors; - private DecoratedKey currentKey; private boolean tokenOffsetWriterCompleted = false; private boolean aborted = false; @@ -65,7 +62,7 @@ public class StorageAttachedIndexWriter implements SSTableFlushObserver Collection<StorageAttachedIndex> indexes, LifecycleNewTracker lifecycleNewTracker) throws IOException { - return new StorageAttachedIndexWriter(indexDescriptor, indexes, lifecycleNewTracker, false, false); + return new StorageAttachedIndexWriter(indexDescriptor, indexes, lifecycleNewTracker, false); } @@ -74,19 +71,16 @@ public class StorageAttachedIndexWriter implements SSTableFlushObserver LifecycleNewTracker lifecycleNewTracker, boolean perIndexComponentsOnly) throws IOException { - return new StorageAttachedIndexWriter(indexDescriptor, indexes, lifecycleNewTracker, perIndexComponentsOnly, true); + return new StorageAttachedIndexWriter(indexDescriptor, indexes, lifecycleNewTracker, perIndexComponentsOnly); } private StorageAttachedIndexWriter(IndexDescriptor indexDescriptor, Collection<StorageAttachedIndex> indexes, LifecycleNewTracker lifecycleNewTracker, - boolean perIndexComponentsOnly, - boolean propagateErrors) throws IOException + boolean perIndexComponentsOnly) throws IOException { this.indexDescriptor = indexDescriptor; - this.indexes = indexes; this.rowMapping = RowMapping.create(lifecycleNewTracker.opType()); - this.propagateErrors = propagateErrors; this.perIndexWriters = indexes.stream().map(index -> indexDescriptor.newPerColumnIndexWriter(index, lifecycleNewTracker, rowMapping)) @@ -209,13 +203,11 @@ public class StorageAttachedIndexWriter implements SSTableFlushObserver */ public void abort(Throwable accumulator, boolean fromIndex) { + if (aborted) return; + // Mark the write operation aborted, so we can short-circuit any further operations on the component writers. aborted = true; - // Make any indexes involved in this transaction non-queryable, as they will likely not match the backing table. - if (fromIndex) - indexes.forEach(StorageAttachedIndex::makeIndexNonQueryable); - for (PerColumnIndexWriter perIndexWriter : perIndexWriters) { try @@ -237,9 +229,9 @@ public class StorageAttachedIndexWriter implements SSTableFlushObserver perSSTableWriter.abort(); } - // If the abort was from an index error and this is part of an index build operation then - // propagate the error up to the SecondaryIndexManager, so it can be correctly marked as failed. - if (fromIndex && propagateErrors) + // If the abort was from an index error, propagate the error upstream so index builds, compactions, and + // flushes can handle it correctly. + if (fromIndex) throw Throwables.unchecked(accumulator); } diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java index 5aa2a985d5..88680d9b09 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java @@ -19,6 +19,7 @@ package org.apache.cassandra.index.sai.disk.format; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Set; import java.util.stream.Collectors; @@ -54,12 +55,12 @@ import org.apache.lucene.util.IOUtils; /** * The {@link IndexDescriptor} is an analog of the SSTable {@link Descriptor} and provides version * specific information about the on-disk state of a {@link StorageAttachedIndex}. - * + * <p> * The {@link IndexDescriptor} is primarily responsible for maintaining a view of the on-disk state * of an index for a specific {@link org.apache.cassandra.io.sstable.SSTable}. - * + * <p> * It is responsible for opening files for use by writers and readers. - * + * <p> * Its remaining responsibility is to act as a proxy to the {@link OnDiskFormat} associated with the * index {@link Version}. */ @@ -170,13 +171,11 @@ public class IndexDescriptor return isPerColumnIndexBuildComplete(indexContext) && numberOfPerIndexComponents(indexContext) == 1; } - @SuppressWarnings("UnstableApiUsage") public void createComponentOnDisk(IndexComponent component) throws IOException { Files.touch(fileFor(component).toJavaIOFile()); } - @SuppressWarnings("UnstableApiUsage") public void createComponentOnDisk(IndexComponent component, IndexContext indexContext) throws IOException { Files.touch(fileFor(component, indexContext).toJavaIOFile()); @@ -335,7 +334,16 @@ public class IndexDescriptor logger.info(indexContext.logMessage("Validating per-column index components using mode " + validation)); boolean checksum = validation == IndexValidation.CHECKSUM; - return version.onDiskFormat().validatePerColumnIndexComponents(this, indexContext, checksum); + + try + { + version.onDiskFormat().validatePerColumnIndexComponents(this, indexContext, checksum); + return true; + } + catch (UncheckedIOException e) + { + return false; + } } @SuppressWarnings("BooleanMethodIsAlwaysInverted") @@ -346,7 +354,27 @@ public class IndexDescriptor logger.info(logMessage("Validating per-sstable index components using mode " + validation)); boolean checksum = validation == IndexValidation.CHECKSUM; - return version.onDiskFormat().validatePerSSTableIndexComponents(this, checksum); + + try + { + version.onDiskFormat().validatePerSSTableIndexComponents(this, checksum); + return true; + } + catch (UncheckedIOException e) + { + return false; + } + } + + public void checksumPerIndexComponents(IndexContext indexContext) + { + version.onDiskFormat().validatePerColumnIndexComponents(this, indexContext, true); + } + + @SuppressWarnings("BooleanMethodIsAlwaysInverted") + public void checksumPerSSTableComponents() + { + version.onDiskFormat().validatePerSSTableIndexComponents(this, true); } public void deletePerSSTableIndexComponents() diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java index cf19dceb6b..32551b3a16 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java @@ -19,6 +19,7 @@ package org.apache.cassandra.index.sai.disk.format; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Set; import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; @@ -35,7 +36,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; /** * An interface to the on-disk format of an index. This provides format agnostic methods * to read and write an on-disk format. - * + * <p> * The methods on this interface can be logically mapped into the following groups * based on their method parameters: * <ul> @@ -116,9 +117,9 @@ public interface OnDiskFormat * @param indexDescriptor The {@link IndexDescriptor} for the SSTable SAI index * @param checksum {@code true} if the checksum should be tested as part of the validation * - * @return true if all the per-SSTable components are valid + * @throws UncheckedIOException if there is a problem validating any on-disk component */ - boolean validatePerSSTableIndexComponents(IndexDescriptor indexDescriptor, boolean checksum); + void validatePerSSTableIndexComponents(IndexDescriptor indexDescriptor, boolean checksum); /** * Validate all the per-column on-disk components and throw if a component is not valid @@ -127,9 +128,9 @@ public interface OnDiskFormat * @param indexContext The {@link IndexContext} holding the per-index information for the index * @param checksum {@code true} if the checksum should be tested as part of the validation * - * @return true if all the per-column components are valid + * @throws UncheckedIOException if there is a problem validating any on-disk component */ - boolean validatePerColumnIndexComponents(IndexDescriptor indexDescriptor, IndexContext indexContext, boolean checksum); + void validatePerColumnIndexComponents(IndexDescriptor indexDescriptor, IndexContext indexContext, boolean checksum); /** * Returns the set of {@link IndexComponent} for the per-SSTable part of an index. diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java index 763e1af102..86dcd24675 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java @@ -19,10 +19,12 @@ package org.apache.cassandra.index.sai.disk.v1; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.EnumSet; import java.util.Set; import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.utils.Throwables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,13 +77,13 @@ public class V1OnDiskFormat implements OnDiskFormat /** * Global limit on heap consumed by all index segment building that occurs outside the context of Memtable flush. - * + * <p> * Note that to avoid flushing small index segments, a segment is only flushed when * both the global size of all building segments has breached the limit and the size of the * segment in question reaches (segment_write_buffer_space_mb / # currently building column indexes). - * + * <p> * ex. If there is only one column index building, it can buffer up to segment_write_buffer_space_mb. - * + * <p> * ex. If there is one column index building per table across 8 compactors, each index will be * eligible to flush once it reaches (segment_write_buffer_space_mb / 8) MBs. */ @@ -161,7 +163,7 @@ public class V1OnDiskFormat implements OnDiskFormat } @Override - public boolean validatePerSSTableIndexComponents(IndexDescriptor indexDescriptor, boolean checksum) + public void validatePerSSTableIndexComponents(IndexDescriptor indexDescriptor, boolean checksum) { for (IndexComponent indexComponent : perSSTableIndexComponents()) { @@ -174,22 +176,20 @@ public class V1OnDiskFormat implements OnDiskFormat else SAICodecUtils.validate(input); } - catch (Throwable e) + catch (Exception e) { - logger.error(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}. Error: {}"), - checksum ? "Checksum validation" : "Validation", - indexComponent, - indexDescriptor.sstableDescriptor, - e); - return false; + logger.warn(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}."), + checksum ? "Checksum validation" : "Validation", + indexComponent, + indexDescriptor.sstableDescriptor); + rethrowIOException(e); } } } - return true; } @Override - public boolean validatePerColumnIndexComponents(IndexDescriptor indexDescriptor, IndexContext indexContext, boolean checksum) + public void validatePerColumnIndexComponents(IndexDescriptor indexDescriptor, IndexContext indexContext, boolean checksum) { for (IndexComponent indexComponent : perColumnIndexComponents(indexContext)) { @@ -202,20 +202,25 @@ public class V1OnDiskFormat implements OnDiskFormat else SAICodecUtils.validate(input); } - catch (Throwable e) + catch (Exception e) { - if (logger.isDebugEnabled()) - { - logger.debug(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}"), - (checksum ? "Checksum validation" : "Validation"), - indexComponent, - indexDescriptor.sstableDescriptor); - } - return false; + logger.warn(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}"), + checksum ? "Checksum validation" : "Validation", + indexComponent, + indexDescriptor.sstableDescriptor); + rethrowIOException(e); } } } - return true; + } + + private static void rethrowIOException(Exception e) + { + if (e instanceof IOException) + throw new UncheckedIOException((IOException) e); + if (e.getCause() instanceof IOException) + throw new UncheckedIOException((IOException) e.getCause()); + throw Throwables.unchecked(e); } @Override diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java index 4668844101..1ae3a04943 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java @@ -222,6 +222,7 @@ public class SASIIndex implements Index, INotificationConsumer }; } + @Override public boolean shouldBuildBlocking() { return true; diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index c9cc2b1c2d..05140632f9 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -207,8 +207,8 @@ public abstract class SSTableWriter extends SSTable implements Transactional public SSTableReader finish(boolean openResult) { this.setOpenResult(openResult); - txnProxy.finish(); observers.forEach(SSTableFlushObserver::complete); + txnProxy.finish(); return finished(); } @@ -232,12 +232,15 @@ public abstract class SSTableWriter extends SSTable implements Transactional { try { - return txnProxy.commit(accumulate); + observers.forEach(SSTableFlushObserver::complete); } - finally + catch (Throwable t) { - observers.forEach(SSTableFlushObserver::complete); + // Return without advancing to COMMITTED, which will trigger abort() when the Transactional closes... + return Throwables.merge(accumulate, t); } + + return txnProxy.commit(accumulate); } public final Throwable abort(Throwable accumulate) diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/ImportIndexedSSTablesTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/ImportIndexedSSTablesTest.java new file mode 100644 index 0000000000..6ce6bb579e --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/sai/ImportIndexedSSTablesTest.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.sai; + +import java.io.IOException; +import java.util.concurrent.Callable; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.index.sai.StorageAttachedIndexBuilder; +import org.apache.cassandra.index.sai.disk.v1.SAICodecUtils; +import org.apache.cassandra.utils.Throwables; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.IndexInput; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public class ImportIndexedSSTablesTest extends TestBaseImpl +{ + private static Cluster cluster; + + @BeforeClass + public static void startup() throws IOException + { + cluster = init(Cluster.build(2).withConfig(c -> c.with(NETWORK, GOSSIP)) + .withInstanceInitializer(ByteBuddyHelper::installErrors) + .start()); + + cluster.disableAutoCompaction(KEYSPACE); + } + + @AfterClass + public static void shutdown() + { + if (cluster != null) + cluster.close(); + } + + @Test + public void testIndexBuildingFailureDuringImport() + { + String table = "fail_during_import_test"; + cluster.schemaChange(String.format("CREATE TABLE %s.%s (pk int PRIMARY KEY, v text)", KEYSPACE, table)); + + IInvokableInstance first = cluster.get(1); + first.runOnInstance(()-> ByteBuddyHelper.failValidation = false); + first.runOnInstance(()-> ByteBuddyHelper.interruptBuild = true); + + first.executeInternal(String.format("INSERT INTO %s.%s(pk, v) VALUES (?, ?)", KEYSPACE, table), 1, "v1"); + first.flush(KEYSPACE); + + Object[][] rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(1); + + first.runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(table).clearUnsafe()); + + String indexName = table + "_v_index"; + cluster.schemaChange(String.format("CREATE INDEX %s ON %s.%s(v) USING 'sai'", indexName, KEYSPACE, table)); + SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, indexName); + + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(0); + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(0); + + first.runOnInstance(() -> + assertThatThrownBy(() -> + ColumnFamilyStore.loadNewSSTables(KEYSPACE, table)).hasRootCauseExactlyInstanceOf(CompactionInterruptedException.class)); + + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(0); + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(0); + } + + @Test + public void testImportBuildsSSTableIndexes() + { + String table = "import_build_test"; + cluster.schemaChange(String.format("CREATE TABLE %s.%s (pk int PRIMARY KEY, v text)", KEYSPACE, table)); + + IInvokableInstance first = cluster.get(1); + first.runOnInstance(()-> ByteBuddyHelper.failValidation = false); + first.runOnInstance(()-> ByteBuddyHelper.interruptBuild = false); + + first.executeInternal(String.format("INSERT INTO %s.%s(pk, v) VALUES (?, ?)", KEYSPACE, table), 1, "v1"); + first.flush(KEYSPACE); + + Object[][] rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(1); + + first.runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(table).clearUnsafe()); + + String indexName = table + "_v_index"; + cluster.schemaChange(String.format("CREATE INDEX %s ON %s.%s(v) USING 'sai'", indexName, KEYSPACE, table)); + SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, indexName); + + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(0); + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(0); + + first.runOnInstance(() -> ColumnFamilyStore.loadNewSSTables(KEYSPACE, table)); + + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(1); + assertThat(rs[0][0]).isEqualTo(1); + + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(1); + assertThat(rs[0][0]).isEqualTo(1); + } + + @Test + public void testValidationFailureDuringImport() + { + String table = "validation_failure_test"; + cluster.schemaChange(String.format("CREATE TABLE %s.%s (pk int PRIMARY KEY, v text)", KEYSPACE, table)); + + IInvokableInstance first = cluster.get(1); + first.runOnInstance(()-> ByteBuddyHelper.failValidation = true); + first.runOnInstance(()-> ByteBuddyHelper.interruptBuild = false); + + first.executeInternal(String.format("INSERT INTO %s.%s(pk, v) VALUES (?, ?)", KEYSPACE, table), 1, "v1"); + first.flush(KEYSPACE); + + Object[][] rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(1); + + String indexName = table + "_v_index"; + cluster.schemaChange(String.format("CREATE INDEX %s ON %s.%s(v) USING 'sai'", indexName, KEYSPACE, table)); + SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, indexName); + + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(1); + assertThat(rs[0][0]).isEqualTo(1); + + first.runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(table).clearUnsafe()); + + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(0); + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(0); + + first.runOnInstance(() -> + assertThatThrownBy(() -> + ColumnFamilyStore.loadNewSSTables(KEYSPACE, table)).hasRootCauseExactlyInstanceOf(CorruptIndexException.class)); + + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(0); + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(0); + } + + @Test + public void testImportIncludesExistingSSTableIndexes() + { + String table = "existing_indexes_test"; + cluster.schemaChange(String.format("CREATE TABLE %s.%s (pk int PRIMARY KEY, v text)", KEYSPACE, table)); + + IInvokableInstance first = cluster.get(1); + first.runOnInstance(()-> ByteBuddyHelper.failValidation = false); + first.runOnInstance(()-> ByteBuddyHelper.interruptBuild = false); + + first.executeInternal(String.format("INSERT INTO %s.%s(pk, v) VALUES (?, ?)", KEYSPACE, table), 1, "v1"); + first.flush(KEYSPACE); + + Object[][] rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(1); + + String indexName = table + "_v_index"; + cluster.schemaChange(String.format("CREATE INDEX %s ON %s.%s(v) USING 'sai'", indexName, KEYSPACE, table)); + SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, indexName); + + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(1); + assertThat(rs[0][0]).isEqualTo(1); + + first.runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(table).clearUnsafe()); + + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(0); + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(0); + + first.runOnInstance(() -> ColumnFamilyStore.loadNewSSTables(KEYSPACE, table)); + + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(1); + assertThat(rs[0][0]).isEqualTo(1); + + rs = first.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(1); + assertThat(rs[0][0]).isEqualTo(1); + } + + public static class ByteBuddyHelper + { + static volatile boolean interruptBuild = false; + static volatile boolean failValidation = false; + + @SuppressWarnings("resource") + static void installErrors(ClassLoader loader, int node) + { + new ByteBuddy().rebase(StorageAttachedIndexBuilder.class) + .method(named("isStopRequested")) + .intercept(MethodDelegation.to(ByteBuddyHelper.class)) + .make() + .load(loader, ClassLoadingStrategy.Default.INJECTION); + + new ByteBuddy().rebase(SAICodecUtils.class) + .method(named("validateChecksum")) + .intercept(MethodDelegation.to(ByteBuddyHelper.class)) + .make() + .load(loader, ClassLoadingStrategy.Default.INJECTION); + } + + @SuppressWarnings("unused") + public static boolean isStopRequested(@SuperCall Callable<Boolean> zuper) + { + if (interruptBuild) + return true; + + try + { + return zuper.call(); + } + catch (Exception e) + { + throw Throwables.unchecked(e); + } + } + + @SuppressWarnings("unused") + public static void validateChecksum(IndexInput input, @SuperCall Callable<Void> zuper) throws IOException + { + if (failValidation) + throw new CorruptIndexException("Injected failure!", "Test resource"); + + try + { + zuper.call(); + } + catch (Exception e) + { + throw Throwables.unchecked(e); + } + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingFailureTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingFailureTest.java new file mode 100644 index 0000000000..c3ff4a528f --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingFailureTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.sai; + +import java.io.IOException; +import java.util.concurrent.Callable; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.index.sai.IndexContext; +import org.apache.cassandra.index.sai.disk.format.IndexDescriptor; +import org.apache.cassandra.index.sai.disk.v1.SAICodecUtils; +import org.apache.cassandra.index.sai.disk.v1.segment.SegmentBuilder; +import org.apache.cassandra.index.sai.disk.v1.segment.SegmentMetadata; +import org.apache.cassandra.utils.Throwables; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.IndexInput; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertFalse; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public class IndexStreamingFailureTest extends TestBaseImpl +{ + public static final String TEST_ERROR_MESSAGE = "Injected failure!"; + + private static Cluster cluster; + + @BeforeClass + public static void startup() throws IOException + { + cluster = init(Cluster.build(2).withConfig(c -> c.with(NETWORK, GOSSIP)) + .withInstanceInitializer(ByteBuddyHelper::installErrors) + .start()); + + cluster.disableAutoCompaction(KEYSPACE); + } + + @AfterClass + public static void shutdown() + { + if (cluster != null) + cluster.close(); + } + + @Test + public void testAvailabilityAfterFailedNonEntireFileStreaming() throws Exception + { + cluster.get(2).runOnInstance(()-> ByteBuddyHelper.failFlush = true); + cluster.get(2).runOnInstance(()-> ByteBuddyHelper.failValidation = false); + testAvailabilityAfterStreaming("non_entire_file_test", false); + } + + @Test + public void testAvailabilityAfterFailedEntireFileStreaming() throws Exception + { + cluster.get(2).runOnInstance(()-> ByteBuddyHelper.failFlush = false); + cluster.get(2).runOnInstance(()-> ByteBuddyHelper.failValidation = true); + testAvailabilityAfterStreaming("entire_file_test", true); + } + + private void testAvailabilityAfterStreaming(String table, boolean streamEntireSSTables) throws Exception + { + String indexName = table + "_v_index"; + cluster.schemaChange(String.format("CREATE TABLE %s.%s (pk int PRIMARY KEY, v text)", KEYSPACE, table)); + cluster.schemaChange(String.format("CREATE INDEX %s ON %s.%s(v) USING 'sai'", indexName, KEYSPACE, table)); + SAIUtil.waitForIndexQueryable(cluster, KEYSPACE, indexName); + + IInvokableInstance first = cluster.get(1); + IInvokableInstance second = cluster.get(2); + first.runOnInstance(()-> DatabaseDescriptor.setStreamEntireSSTables(streamEntireSSTables)); + second.runOnInstance(()-> DatabaseDescriptor.setStreamEntireSSTables(streamEntireSSTables)); + + first.executeInternal(String.format("INSERT INTO %s.%s(pk, v) VALUES (?, ?)", KEYSPACE, table), 1, "v1"); + first.flush(KEYSPACE); + + Object[][] rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(0); + + // The repair job should fail when index completion fails. This should also fail the streaming transaction. + long mark = second.logs().mark(); + second.nodetoolResult("repair", KEYSPACE).asserts().failure(); + assertFalse("There should be an injected failure in the logs.", second.logs().grep(mark, TEST_ERROR_MESSAGE).getResult().isEmpty()); + + // The SSTable should not be added to the table view, as the streaming transaction failed... + rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(0); + + // ...and querying the index also returns nothing, as the index for the streamed SSTable was never built. + rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(0); + + // On restart, ensure that the index remains querable and does not include the data we attempted to stream. + second.shutdown().get(); + second.startup(); + + // On restart, the base table should be unchanged... + rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE pk = ?", KEYSPACE, table), 1); + assertThat(rs.length).isEqualTo(0); + + // ...and the index should remain queryable, because from its perspective, the streaming never happened. + rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(0); + + // Disable failure injection, and verify that the index is queryable and has the newly streamed data: + second.runOnInstance(()-> ByteBuddyHelper.failFlush = false); + second.runOnInstance(()-> ByteBuddyHelper.failValidation = false); + second.nodetoolResult("repair", KEYSPACE).asserts().success(); + + rs = second.executeInternal(String.format("SELECT pk FROM %s.%s WHERE v = ?", KEYSPACE, table), "v1"); + assertThat(rs.length).isEqualTo(1); + } + + public static class ByteBuddyHelper + { + volatile static boolean failFlush = false; + volatile static boolean failValidation = false; + + @SuppressWarnings("resource") + static void installErrors(ClassLoader loader, int node) + { + if (node == 2) + { + new ByteBuddy().rebase(SegmentBuilder.class) + .method(named("flush")) + .intercept(MethodDelegation.to(ByteBuddyHelper.class)) + .make() + .load(loader, ClassLoadingStrategy.Default.INJECTION); + + new ByteBuddy().rebase(SAICodecUtils.class) + .method(named("validateChecksum")) + .intercept(MethodDelegation.to(ByteBuddyHelper.class)) + .make() + .load(loader, ClassLoadingStrategy.Default.INJECTION); + } + } + + @SuppressWarnings("unused") + public static SegmentMetadata flush(IndexDescriptor indexDescriptor, IndexContext indexContext, @SuperCall Callable<SegmentMetadata> zuper) throws IOException + { + if (failFlush) + throw new IOException(TEST_ERROR_MESSAGE); + + try + { + return zuper.call(); + } + catch (Exception e) + { + throw Throwables.unchecked(e); + } + } + + @SuppressWarnings("unused") + public static void validateChecksum(IndexInput input, @SuperCall Callable<Void> zuper) throws IOException + { + if (failValidation) + throw new CorruptIndexException(TEST_ERROR_MESSAGE, "Test resource"); + + try + { + zuper.call(); + } + catch (Exception e) + { + throw Throwables.unchecked(e); + } + } + } +} diff --git a/test/unit/org/apache/cassandra/index/sai/SAITester.java b/test/unit/org/apache/cassandra/index/sai/SAITester.java index 4f147dc4b8..35c4c6c5d6 100644 --- a/test/unit/org/apache/cassandra/index/sai/SAITester.java +++ b/test/unit/org/apache/cassandra/index/sai/SAITester.java @@ -693,7 +693,7 @@ public abstract class SAITester extends CQLTester } } - protected void assertNumRows(int expected, String query, Object... args) throws Throwable + protected void assertNumRows(int expected, String query, Object... args) { ResultSet rs = executeNet(String.format(query, args)); assertEquals(expected, rs.all().size()); diff --git a/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java b/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java index 43635c2e7a..58d530feff 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/StorageAttachedIndexDDLTest.java @@ -133,7 +133,7 @@ public class StorageAttachedIndexDDLTest extends SAITester } @Test - public void shouldFailUnsupportedType() throws Throwable + public void shouldFailUnsupportedType() { for (CQL3Type.Native cql3Type : CQL3Type.Native.values()) { @@ -229,7 +229,7 @@ public class StorageAttachedIndexDDLTest extends SAITester } @Test - public void shouldNotFailCreateWithTupleType() throws Throwable + public void shouldNotFailCreateWithTupleType() { createTable("CREATE TABLE %s (id text PRIMARY KEY, val tuple<text, int, double>)"); @@ -485,7 +485,7 @@ public class StorageAttachedIndexDDLTest extends SAITester } @Test - public void shouldFailCreationMultipleIndexesOnSimpleColumn() throws Throwable + public void shouldFailCreationMultipleIndexesOnSimpleColumn() { createTable("CREATE TABLE %s (id int PRIMARY KEY, v1 TEXT)"); execute("INSERT INTO %s (id, v1) VALUES(1, '1')"); @@ -514,7 +514,7 @@ public class StorageAttachedIndexDDLTest extends SAITester } @Test - public void shouldIndexBuildingWithInMemoryData() throws Throwable + public void shouldIndexBuildingWithInMemoryData() { createTable(CREATE_TABLE_TEMPLATE); @@ -644,7 +644,7 @@ public class StorageAttachedIndexDDLTest extends SAITester } @Test - public void shouldCreateIndexFilesAfterMultipleConcurrentIndexCreation() throws Throwable + public void shouldCreateIndexFilesAfterMultipleConcurrentIndexCreation() { createTable(CREATE_TABLE_TEMPLATE); verifyNoIndexFiles(); @@ -671,7 +671,7 @@ public class StorageAttachedIndexDDLTest extends SAITester } @Test - public void shouldCreateIndexFilesAfterMultipleSequentialIndexCreation() throws Throwable + public void shouldCreateIndexFilesAfterMultipleSequentialIndexCreation() { createTable(CREATE_TABLE_TEMPLATE); verifyNoIndexFiles(); @@ -703,7 +703,7 @@ public class StorageAttachedIndexDDLTest extends SAITester } @Test - public void shouldReleaseIndexFilesAfterCompaction() throws Throwable + public void shouldReleaseIndexFilesAfterCompaction() { createTable(CREATE_TABLE_TEMPLATE); disableCompaction(KEYSPACE); @@ -741,18 +741,18 @@ public class StorageAttachedIndexDDLTest extends SAITester } @Test - public void truncateWithBuiltIndexes() throws Throwable + public void truncateWithBuiltIndexes() { verifyTruncateWithIndex(false); } @Test - public void concurrentTruncateWithIndexBuilding() throws Throwable + public void concurrentTruncateWithIndexBuilding() { verifyTruncateWithIndex(true); } - private void verifyTruncateWithIndex(boolean concurrentTruncate) throws Throwable + private void verifyTruncateWithIndex(boolean concurrentTruncate) { createTable(CREATE_TABLE_TEMPLATE); @@ -881,8 +881,7 @@ public class StorageAttachedIndexDDLTest extends SAITester boolean expectedLiteralState = !failedStringIndex || isBuildCompletionMarker(component); assertEquals("Checksum verification for " + component + " should be " + expectedNumericState + " but was " + !expectedNumericState, - expectedNumericState, - verifyChecksum(numericIndexContext)); + expectedNumericState, verifyChecksum(numericIndexContext)); assertEquals(expectedLiteralState, verifyChecksum(stringIndexContext)); if (rebuild) @@ -1006,7 +1005,7 @@ public class StorageAttachedIndexDDLTest extends SAITester } @Test - public void verifyFlushAndCompactEmptyIndex() throws Throwable + public void verifyFlushAndCompactEmptyIndex() { createTable(CREATE_TABLE_TEMPLATE); disableCompaction(KEYSPACE); @@ -1042,7 +1041,7 @@ public class StorageAttachedIndexDDLTest extends SAITester } @Test - public void verifyFlushAndCompactNonIndexableRows() throws Throwable + public void verifyFlushAndCompactNonIndexableRows() { // valid row ids, but no valid indexable content Runnable populateData = () -> { @@ -1065,7 +1064,7 @@ public class StorageAttachedIndexDDLTest extends SAITester } @Test - public void verifyFlushAndCompactTombstones() throws Throwable + public void verifyFlushAndCompactTombstones() { // no valid row ids Runnable populateData = () -> { @@ -1086,7 +1085,7 @@ public class StorageAttachedIndexDDLTest extends SAITester verifyFlushAndCompactEmptyIndexes(populateData); } - private void verifyFlushAndCompactEmptyIndexes(Runnable populateData) throws Throwable + private void verifyFlushAndCompactEmptyIndexes(Runnable populateData) { createTable(CREATE_TABLE_TEMPLATE); disableCompaction(KEYSPACE); diff --git a/test/unit/org/apache/cassandra/index/sai/metrics/SegmentFlushingFailureTester.java b/test/unit/org/apache/cassandra/index/sai/metrics/SegmentFlushingFailureTester.java index 378bbd4391..4b98a94671 100644 --- a/test/unit/org/apache/cassandra/index/sai/metrics/SegmentFlushingFailureTester.java +++ b/test/unit/org/apache/cassandra/index/sai/metrics/SegmentFlushingFailureTester.java @@ -18,15 +18,19 @@ package org.apache.cassandra.index.sai.metrics; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.cassandra.utils.FBUtilities; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.exceptions.ReadFailureException; import org.apache.cassandra.config.StorageAttachedIndexOptions; import org.apache.cassandra.index.sai.SAITester; import org.apache.cassandra.index.sai.disk.v1.SSTableComponentsWriter; @@ -90,7 +94,7 @@ public abstract class SegmentFlushingFailureTester extends SAITester protected abstract long expectedBytesLimit(); @Test - public void testSegmentMemoryTrackerLifecycle() throws Throwable + public void testSegmentMemoryTrackerLifecycle() { createTable(CREATE_TABLE_TEMPLATE); createIndex(String.format(CREATE_INDEX_TEMPLATE, "v1")); @@ -157,9 +161,10 @@ public abstract class SegmentFlushingFailureTester extends SAITester // Verify that we abort exactly once and zero the memory tracker: verifyCompactionIndexBuilds(1, failure, currentTable()); + // We should still be able to query the index if compaction is aborted: String select = String.format("SELECT * FROM %%s WHERE %s = %s", column, column.equals("v1") ? "0" : "'0'"); - - assertThatThrownBy(() -> executeNet(select)).isInstanceOf(ReadFailureException.class); + ResultSet rows = executeNet(select); + assertEquals(1, rows.all().size()); } @Test @@ -187,11 +192,11 @@ public abstract class SegmentFlushingFailureTester extends SAITester // Start compaction against both tables/indexes and verify that they are aborted safely: verifyCompactionIndexBuilds(2, segmentFlushFailure, table1, table2); - assertThatThrownBy(() -> executeNet(String.format("SELECT * FROM %s WHERE v1 = 0", KEYSPACE + "." + table1))) - .isInstanceOf(ReadFailureException.class); - - assertThatThrownBy(() -> executeNet(String.format("SELECT * FROM %s WHERE v1 = 0", KEYSPACE + "." + table2))) - .isInstanceOf(ReadFailureException.class); + // We should still be able to query the indexes if compaction is aborted: + ResultSet rows = executeNet(String.format("SELECT * FROM %s WHERE v1 = 0", KEYSPACE + "." + table1)); + assertEquals(1, rows.all().size()); + rows = executeNet(String.format("SELECT * FROM %s WHERE v1 = 0", KEYSPACE + "." + table2)); + assertEquals(1, rows.all().size()); } private void verifyCompactionIndexBuilds(int aborts, Injection failure, String... tables) throws Throwable @@ -201,7 +206,14 @@ public abstract class SegmentFlushingFailureTester extends SAITester try { - Arrays.stream(tables).forEach(table -> compact(KEYSPACE, table)); + ExecutorService executor = Executors.newFixedThreadPool(tables.length); + List<Future<?>> results = new ArrayList<>(); + + for (String table : tables) + results.add(executor.submit(() -> compact(KEYSPACE, table))); + + assertThatThrownBy(() -> FBUtilities.waitOnFutures(results)).hasRootCauseMessage("Injected failure!"); + executor.shutdownNow(); Assert.assertEquals(aborts, writerAbortCounter.get()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org