This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new f6c1002e44 Stream individual files in their own transactions and hand over ownership to a parent transaction on completion f6c1002e44 is described below commit f6c1002e44ed73fbcf940242217f5d20e0bf2d7d Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Wed Jun 11 08:08:59 2025 +0200 Stream individual files in their own transactions and hand over ownership to a parent transaction on completion patch by Marcus Eriksson; reviewed by Caleb Rackliffe and Jon Meredith for CASSANDRA-20728 --- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 77 ++++++++--- .../db/compaction/AbstractCompactionStrategy.java | 6 +- .../db/compaction/AbstractStrategyHolder.java | 4 +- .../db/compaction/CompactionStrategyHolder.java | 6 +- .../db/compaction/CompactionStrategyManager.java | 6 +- .../db/compaction/PendingRepairHolder.java | 6 +- .../db/compaction/UnifiedCompactionStrategy.java | 6 +- .../db/compaction/unified/ShardedMultiWriter.java | 12 +- .../db/lifecycle/ILifecycleTransaction.java | 7 +- .../db/lifecycle/LifecycleNewTracker.java | 47 ------- .../db/lifecycle/LifecycleTransaction.java | 18 +-- .../org/apache/cassandra/db/lifecycle/LogFile.java | 15 ++ .../cassandra/db/lifecycle/LogTransaction.java | 12 ++ .../lifecycle/StreamingLifecycleTransaction.java | 68 +++++++++ .../CassandraEntireSSTableStreamReader.java | 50 +++++-- .../db/streaming/CassandraIncomingFile.java | 12 +- .../db/streaming/CassandraStreamReader.java | 17 ++- .../db/streaming/CassandraStreamReceiver.java | 50 +------ src/java/org/apache/cassandra/index/Index.java | 10 +- .../org/apache/cassandra/index/IndexRegistry.java | 4 +- .../cassandra/index/SingletonIndexGroup.java | 6 +- .../index/accord/MemtableIndexManager.java | 4 +- .../cassandra/index/accord/RouteJournalIndex.java | 8 +- .../index/accord/RouteMemtableIndexManager.java | 6 +- .../cassandra/index/sai/StorageAttachedIndex.java | 4 +- .../index/sai/StorageAttachedIndexGroup.java | 6 +- .../index/sai/disk/StorageAttachedIndexWriter.java | 16 +-- .../index/sai/disk/format/IndexDescriptor.java | 6 +- .../index/sai/disk/format/OnDiskFormat.java | 8 +- .../index/sai/disk/v1/V1OnDiskFormat.java | 8 +- .../index/sai/memory/MemtableIndexManager.java | 6 +- .../org/apache/cassandra/index/sasi/SASIIndex.java | 6 +- .../io/sstable/RangeAwareSSTableWriter.java | 20 ++- .../io/sstable/SSTableTxnSingleStreamWriter.java | 145 +++++++++++++++++++ .../io/sstable/SSTableZeroCopyWriter.java | 7 +- .../io/sstable/SimpleSSTableMultiWriter.java | 16 +-- .../cassandra/io/sstable/format/SSTableFormat.java | 3 +- .../cassandra/io/sstable/format/SSTableWriter.java | 30 ++-- .../io/sstable/format/SortedTableWriter.java | 6 +- .../io/sstable/format/big/BigTableWriter.java | 16 +-- .../io/sstable/format/bti/BtiTableWriter.java | 12 +- .../apache/cassandra/streaming/IncomingStream.java | 2 +- .../streaming/StreamDeserializingTask.java | 5 +- .../IndexBuildFailsAfterStreamingTest.java | 99 +++++++++++++ .../streaming/StreamFailedAfterReceivingTest.java | 154 +++++++++++++++++++++ .../streaming/StreamFailedAfterTransferTest.java | 127 +++++++++++++++++ .../db/lifecycle/LifecycleTransactionTest.java | 51 +++++++ .../CassandraEntireSSTableStreamWriterTest.java | 10 +- ...TableStreamConcurrentComponentMutationTest.java | 8 +- .../apache/cassandra/index/CustomIndexTest.java | 8 +- .../org/apache/cassandra/index/StubIndexGroup.java | 4 +- .../org/apache/cassandra/io/sstable/ScrubTest.java | 6 +- .../cassandra/streaming/StreamReaderTest.java | 3 +- 54 files changed, 970 insertions(+), 280 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 6fb94d9626..1efac73e11 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Stream individual files in their own transactions and hand over ownership to a parent transaction on completion (CASSANDRA-20728) * Limit the number of held heap dumps to not consume disk space excessively (CASSANDRA-20457) * Accord: BEGIN TRANSACTIONs IF condition logic does not properly support meaningless emptiness and null values (CASSANDRA-20667) * Accord: startup race condition where accord journal tries to access the 2i index before its ready (CASSANDRA-20686) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index e81c0d7be8..a499bc42f2 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -85,7 +85,7 @@ import org.apache.cassandra.db.compaction.CompactionStrategyManager; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.DataLimits; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.Tracker; @@ -652,19 +652,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner return memtableFactory.streamFromMemtable(); } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, SerializationHeader header, ILifecycleTransaction txn) { - return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, null, 0, header, lifecycleNewTracker); + return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, null, 0, header, txn); } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, IntervalSet<CommitLogPosition> commitLogPositions, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, IntervalSet<CommitLogPosition> commitLogPositions, SerializationHeader header, ILifecycleTransaction txn) { - return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, commitLogPositions, 0, header, lifecycleNewTracker); + return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, commitLogPositions, 0, header, txn); } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, IntervalSet<CommitLogPosition> commitLogPositions, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, IntervalSet<CommitLogPosition> commitLogPositions, int sstableLevel, SerializationHeader header, ILifecycleTransaction txn) { - return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, commitLogPositions, sstableLevel, header, indexManager.listIndexGroups(), lifecycleNewTracker); + return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, commitLogPositions, sstableLevel, header, indexManager.listIndexGroups(), txn); } public boolean supportsEarlyOpen() @@ -2410,22 +2410,67 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner } } - private static final LifecycleNewTracker DO_NOT_TRACK = new LifecycleNewTracker() + private static final ILifecycleTransaction DO_NOT_TRACK = new ILifecycleTransaction() { - public void trackNew(SSTable table) + @Override + public void trackNew(SSTable sstable) { - // not tracking - } - public void untrackNew(SSTable table) - { - // not tracking } - public OperationType opType() + @Override + public void untrackNew(SSTable sstable) { - return OperationType.FLUSH; + } + + @Override + public OperationType opType() {return null;} + + @Override + public void checkpoint() {} + + @Override + public void update(SSTableReader reader, boolean original) {} + + @Override + public void update(Collection<SSTableReader> readers, boolean original) {} + + @Override + public SSTableReader current(SSTableReader reader) {return null;} + + @Override + public void obsolete(SSTableReader reader) {} + + @Override + public void obsoleteOriginals() {} + + @Override + public Set<SSTableReader> originals() {return Set.of();} + + @Override + public boolean isObsolete(SSTableReader reader) {return false;} + + @Override + public boolean isOffline() {return false;} + + @Override + public TimeUUID opId() {return null;} + + @Override + public void cancel(SSTableReader removedSSTable) {} + + @Override + public Throwable commit(Throwable accumulate) {return null;} + + @Override + public Throwable abort(Throwable accumulate) {return null;} + + @Override + public void prepareToCommit() {} + + @Override + public void close() {} }; /** diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 99a509c5f2..def444f9a9 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -36,7 +36,7 @@ import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.commitlog.IntervalSet; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -562,7 +562,7 @@ public abstract class AbstractCompactionStrategy int sstableLevel, SerializationHeader header, Collection<Index.Group> indexGroups, - LifecycleNewTracker lifecycleNewTracker) + ILifecycleTransaction txn) { return SimpleSSTableMultiWriter.create(descriptor, keyCount, @@ -574,7 +574,7 @@ public abstract class AbstractCompactionStrategy sstableLevel, header, indexGroups, - lifecycleNewTracker, cfs); + txn, cfs); } public boolean supportsEarlyOpen() diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java index 2db7e1db60..4bc7146f83 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java @@ -31,7 +31,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.commitlog.IntervalSet; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.index.Index; @@ -200,7 +200,7 @@ public abstract class AbstractStrategyHolder int sstableLevel, SerializationHeader header, Collection<Index.Group> indexGroups, - LifecycleNewTracker lifecycleNewTracker); + ILifecycleTransaction txn); /** * Return the directory index the given compaction strategy belongs to, or -1 diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java index 9bc4d6d7cb..af5c749929 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java @@ -29,7 +29,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.commitlog.IntervalSet; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.index.Index; @@ -230,7 +230,7 @@ public class CompactionStrategyHolder extends AbstractStrategyHolder int sstableLevel, SerializationHeader header, Collection<Index.Group> indexGroups, - LifecycleNewTracker lifecycleNewTracker) + ILifecycleTransaction txn) { if (isRepaired) { @@ -255,7 +255,7 @@ public class CompactionStrategyHolder extends AbstractStrategyHolder sstableLevel, header, indexGroups, - lifecycleNewTracker); + txn); } @Override diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index dfb908425a..b8eaa5bd81 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -54,7 +54,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.commitlog.IntervalSet; import org.apache.cassandra.db.compaction.AbstractStrategyHolder.TaskSupplier; import org.apache.cassandra.db.compaction.PendingRepairManager.CleanupTask; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.dht.Range; @@ -1257,7 +1257,7 @@ public class CompactionStrategyManager implements INotificationConsumer int sstableLevel, SerializationHeader header, Collection<Index.Group> indexGroups, - LifecycleNewTracker lifecycleNewTracker) + ILifecycleTransaction txn) { SSTable.validateRepairedMetadata(repairedAt, pendingRepair, isTransient); maybeReloadDiskBoundaries(); @@ -1273,7 +1273,7 @@ public class CompactionStrategyManager implements INotificationConsumer sstableLevel, header, indexGroups, - lifecycleNewTracker); + txn); } finally { diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java index 1f2ff4a1a1..45e8753394 100644 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java @@ -30,7 +30,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.commitlog.IntervalSet; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.index.Index; @@ -250,7 +250,7 @@ public class PendingRepairHolder extends AbstractStrategyHolder int sstableLevel, SerializationHeader header, Collection<Index.Group> indexGroups, - LifecycleNewTracker lifecycleNewTracker) + ILifecycleTransaction txn) { Preconditions.checkArgument(repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE, "PendingRepairHolder can't create sstablewriter with repaired at set"); @@ -267,7 +267,7 @@ public class PendingRepairHolder extends AbstractStrategyHolder sstableLevel, header, indexGroups, - lifecycleNewTracker); + txn); } @Override diff --git a/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java index c23e600e7b..d44a9f3288 100644 --- a/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java @@ -46,7 +46,7 @@ import org.apache.cassandra.db.compaction.unified.Controller; import org.apache.cassandra.db.compaction.unified.ShardedMultiWriter; import org.apache.cassandra.db.compaction.unified.UnifiedCompactionTask; import org.apache.cassandra.db.lifecycle.CompositeLifecycleTransaction; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.PartialLifecycleTransaction; import org.apache.cassandra.exceptions.ConfigurationException; @@ -303,7 +303,7 @@ public class UnifiedCompactionStrategy extends AbstractCompactionStrategy int sstableLevel, SerializationHeader header, Collection<Index.Group> indexGroups, - LifecycleNewTracker lifecycleNewTracker) + ILifecycleTransaction txn) { ShardManager shardManager = getShardManager(); double flushDensity = cfs.metric.flushSizeOnDisk.get() * shardManager.shardSetCoverage() / shardManager.localSpaceCoverage(); @@ -317,7 +317,7 @@ public class UnifiedCompactionStrategy extends AbstractCompactionStrategy commitLogPositions, header, indexGroups, - lifecycleNewTracker, + txn, boundaries); } diff --git a/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java b/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java index a5b5df9e49..ab5465df25 100644 --- a/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java @@ -31,7 +31,7 @@ import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.commitlog.IntervalSet; import org.apache.cassandra.db.compaction.ShardTracker; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.Descriptor; @@ -64,7 +64,7 @@ public class ShardedMultiWriter implements SSTableMultiWriter private final IntervalSet<CommitLogPosition> commitLogPositions; private final SerializationHeader header; private final Collection<Index.Group> indexGroups; - private final LifecycleNewTracker lifecycleNewTracker; + private final ILifecycleTransaction txn; private final ShardTracker boundaries; private final SSTableWriter[] writers; private int currentWriter; @@ -78,7 +78,7 @@ public class ShardedMultiWriter implements SSTableMultiWriter IntervalSet<CommitLogPosition> commitLogPositions, SerializationHeader header, Collection<Index.Group> indexGroups, - LifecycleNewTracker lifecycleNewTracker, + ILifecycleTransaction txn, ShardTracker boundaries) { this.cfs = cfs; @@ -90,7 +90,7 @@ public class ShardedMultiWriter implements SSTableMultiWriter this.commitLogPositions = commitLogPositions; this.header = header; this.indexGroups = indexGroups; - this.lifecycleNewTracker = lifecycleNewTracker; + this.txn = txn; this.boundaries = boundaries; this.writers = new SSTableWriter[this.boundaries.count()]; // at least one @@ -118,7 +118,7 @@ public class ShardedMultiWriter implements SSTableMultiWriter .setSerializationHeader(header) .addDefaultComponents(indexGroups) .setSecondaryIndexGroups(indexGroups) - .build(lifecycleNewTracker, cfs); + .build(txn, cfs); } private long forSplittingKeysBy(long splits) { @@ -227,7 +227,7 @@ public class ShardedMultiWriter implements SSTableMultiWriter for (SSTableWriter writer : writers) if (writer != null) { - lifecycleNewTracker.untrackNew(writer); + txn.untrackNew(writer); t = writer.abort(t); } return t; diff --git a/src/java/org/apache/cassandra/db/lifecycle/ILifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/ILifecycleTransaction.java index f3e9c9b7c5..5da8447917 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/ILifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/ILifecycleTransaction.java @@ -23,13 +23,18 @@ import java.util.Set; import com.google.common.collect.Iterables; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.TimeUUID; import org.apache.cassandra.utils.concurrent.Transactional; -public interface ILifecycleTransaction extends Transactional, LifecycleNewTracker +public interface ILifecycleTransaction extends Transactional { + void trackNew(SSTable sstable); + void untrackNew(SSTable sstable); + OperationType opType(); void checkpoint(); void update(SSTableReader reader, boolean original); void update(Collection<SSTableReader> readers, boolean original); diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java deleted file mode 100644 index 9a0785c43f..0000000000 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.cassandra.db.lifecycle; - -import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.io.sstable.SSTable; - -/** - * An interface for tracking new sstables added to a LifecycleTransaction, possibly through some proxy. - */ -public interface LifecycleNewTracker -{ - /** - * Called when a new table is about to be created, so that this table can be tracked by a transaction. - * @param table - the new table to be tracked - */ - void trackNew(SSTable table); - - - /** - * Called when a new table is no longer required, so that this table can be untracked by a transaction. - * @param table - the table to be untracked - */ - void untrackNew(SSTable table); - - /** - * @return the type of operation tracking these sstables - */ - OperationType opType(); -} diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index 15b0417b38..48b0bc1ffb 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@ -205,12 +205,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im } } - public LogTransaction log() - { - return log; - } - - @Override //LifecycleNewTracker + @Override public OperationType opType() { return log.type(); @@ -322,6 +317,14 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im return accumulate; } + void takeOwnership(ILifecycleTransaction txn) + { + LifecycleTransaction ltn = (LifecycleTransaction)txn; + if (!ltn.obsoletions.isEmpty() || !ltn.originals.isEmpty() || !ltn.logged.isEmpty()) + throw new IllegalStateException("takeOwnership is only supported in add-only transactions (streams)"); + log.takeOwnership(ltn.log); + } + private Throwable runOnCommitHooks(Throwable accumulate) { return runHooks(commitHooks, accumulate); @@ -646,8 +649,6 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im return getFirst(originals, null); } - // LifecycleNewTracker - @Override public void trackNew(SSTable table) { @@ -659,7 +660,6 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im { log.untrackNew(table); } - public static boolean removeUnfinishedLeftovers(ColumnFamilyStore cfs) { return LogTransaction.removeUnfinishedLeftovers(cfs.getDirectories().getCFDirectories()); diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java index 13436b112a..08ba257f16 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java @@ -552,4 +552,19 @@ final class LogFile implements AutoCloseable { return records.isEmpty(); } + + public void takeOwnership(LogFile txnFile) + { + if (completed) + throw TransactionAlreadyCompletedException.create(getFiles()); + for (LogRecord record : txnFile.records) + { + if (record.type != Type.ADD) + throw new IllegalStateException("Can only transfer ADD records - not " + record + " - " + txnFile.records); + File directory = new File(record.absolutePath.get()).parent(); + String fileName = StringUtils.join(directory, File.pathSeparator(), getFileName()); + replicas.maybeCreateReplica(directory, fileName, onDiskRecords); + addRecord(record); + } + } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java index 92766de385..b675ad238b 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java @@ -209,6 +209,18 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran } } + void takeOwnership(LogTransaction log) + { + synchronized (lock) + { + if (state() != State.IN_PROGRESS) + throw new IllegalStateException("The LogTransaction getting ownership should be IN_PROGRESS, not " + state()); + if (log.state() != State.READY_TO_COMMIT) + throw new IllegalStateException("The LogTransaction giving up its ownership should be READY_TO_COMMIT, not " + log.state()); + txnFile.takeOwnership(log.txnFile); + } + } + Map<SSTable, LogRecord> makeRemoveRecords(Iterable<SSTableReader> sstables) { synchronized (lock) diff --git a/src/java/org/apache/cassandra/db/lifecycle/StreamingLifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/StreamingLifecycleTransaction.java new file mode 100644 index 0000000000..38145b6132 --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/StreamingLifecycleTransaction.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.lifecycle; + +import java.util.Collection; + +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.format.SSTableReader; + +import static org.apache.cassandra.utils.Throwables.maybeFail; + +/** + * Special, restricted LifecycleTransaction for streaming, synchronizes access to the shared transaction + * and adds a method to take ownership of a "child-transaction". + * + * Each incoming file is now its own normal LifecycleTransaction. + */ +public class StreamingLifecycleTransaction +{ + private final LifecycleTransaction sharedTxn; + + public StreamingLifecycleTransaction() + { + this.sharedTxn = LifecycleTransaction.offline(OperationType.STREAM); + } + + public synchronized Throwable commit(Throwable accumulate) + { + return sharedTxn.commit(accumulate); + } + + public synchronized void update(Collection<SSTableReader> readers) + { + sharedTxn.update(readers, false); + } + + public synchronized void abort() + { + maybeFail(sharedTxn.abort(null)); + } + + public synchronized void finish() + { + sharedTxn.prepareToCommit(); + sharedTxn.commit(); + } + + public synchronized void takeOwnership(ILifecycleTransaction txn) + { + sharedTxn.takeOwnership(txn); + } +} diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java index 97c3b2d4f9..f2f1784a9c 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.streaming; import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.util.Collection; import java.util.function.UnaryOperator; @@ -28,12 +29,15 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.IOOptions; import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter; import org.apache.cassandra.io.sstable.SSTableZeroCopyWriter; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.io.util.DataInputPlus; @@ -102,7 +106,7 @@ public class CassandraEntireSSTableStreamReader implements IStreamReader prettyPrintMemory(totalSize), cfs.metadata()); - SSTableZeroCopyWriter writer = null; + SSTableTxnZeroCopyWriter writer = null; try { @@ -121,7 +125,7 @@ public class CassandraEntireSSTableStreamReader implements IStreamReader prettyPrintMemory(totalSize)); writer.writeComponent(component, in, length); - session.progress(writer.descriptor.fileFor(component).toString(), ProgressInfo.Direction.IN, length, length, length); + session.progress(writer.descriptor().fileFor(component).toString(), ProgressInfo.Direction.IN, length, length, length); bytesRead += length; logger.debug("[Stream #{}] Finished receiving {} component from {}, componentSize = {}, readBytes = {}, totalSize = {}", @@ -137,7 +141,7 @@ public class CassandraEntireSSTableStreamReader implements IStreamReader .mutateRepairedMetadata(messageHeader.repairedAt, messageHeader.pendingRepair, false); String description = String.format("level %s and repairedAt time %s and pendingRepair %s", header.sstableLevel, messageHeader.repairedAt, messageHeader.pendingRepair); - writer.descriptor.getMetadataSerializer().mutate(writer.descriptor, description, transform); + writer.descriptor().getMetadataSerializer().mutate(writer.descriptor(), description, transform); return writer; } catch (Throwable e) @@ -167,14 +171,14 @@ public class CassandraEntireSSTableStreamReader implements IStreamReader return dir; } - protected SSTableZeroCopyWriter createWriter(ColumnFamilyStore cfs, long totalSize, Collection<Component> components) throws IOException + protected SSTableTxnZeroCopyWriter createWriter(ColumnFamilyStore cfs, long totalSize, Collection<Component> components) throws IOException { File dataDir = getDataDir(cfs, totalSize); StreamReceiver streamReceiver = session.getAggregator(tableId); assert streamReceiver instanceof CassandraStreamReceiver; - LifecycleNewTracker lifecycleNewTracker = CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).createLifecycleNewTracker(); + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM); Descriptor desc = cfs.newSSTableDescriptor(dataDir, header.version); @@ -190,12 +194,32 @@ public class CassandraEntireSSTableStreamReader implements IStreamReader DatabaseDescriptor.getFlushCompression()); logger.debug("[Table #{}] {} Components to write: {}", cfs.metadata(), desc, components); - return desc.getFormat() - .getWriterFactory() - .builder(desc) - .setComponents(components) - .setTableMetadataRef(cfs.metadata) - .setIOOptions(ioOptions) - .createZeroCopyWriter(lifecycleNewTracker, cfs); + return new SSTableTxnZeroCopyWriter(txn, desc.getFormat() + .getWriterFactory() + .builder(desc) + .setComponents(components) + .setTableMetadataRef(cfs.metadata) + .setIOOptions(ioOptions) + .createZeroCopyWriter(txn, cfs)); + } + + public static class SSTableTxnZeroCopyWriter extends SSTableTxnSingleStreamWriter + { + private final SSTableZeroCopyWriter writer; + public SSTableTxnZeroCopyWriter(ILifecycleTransaction txn, SSTableZeroCopyWriter writer) + { + super(txn, writer); + this.writer = writer; + } + + public void writeComponent(Component component, DataInputPlus in, long length) throws ClosedChannelException + { + writer.writeComponent(component, in, length); + } + + public Descriptor descriptor() + { + return writer.descriptor; + } } } diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java index e8a6fbcc7c..3498f28b61 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.IncomingStream; @@ -43,7 +44,7 @@ public class CassandraIncomingFile implements IncomingStream private final StreamSession session; private final StreamMessageHeader header; - private volatile SSTableMultiWriter sstable; + private volatile SSTableTxnSingleStreamWriter sstable; private volatile long size = -1; private volatile int numFiles = 1; @@ -84,7 +85,14 @@ public class CassandraIncomingFile implements IncomingStream reader = new CassandraStreamReader(header, streamHeader, session); size = streamHeader.size(); - sstable = reader.read(in); + sstable = (SSTableTxnSingleStreamWriter)reader.read(in); + } + + public synchronized Throwable abort(Throwable t) + { + if (sstable != null) + t = sstable.abort(t); + return t; } @Override diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java index af4b4dbc18..bab8d5a84b 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java @@ -35,7 +35,9 @@ import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.SerializationHeader; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.rows.DeserializationHelper; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.Row; @@ -47,6 +49,7 @@ import org.apache.cassandra.exceptions.UnknownColumnException; import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.SSTableSimpleIterator; +import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.Version; @@ -172,7 +175,7 @@ public class CassandraStreamReader implements IStreamReader { return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader } - protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, TimeUUID pendingRepair, SSTableFormat<?, ?> format) throws IOException + protected SSTableTxnSingleStreamWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, TimeUUID pendingRepair, SSTableFormat<?, ?> format) throws IOException { Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); if (localDir == null) @@ -180,10 +183,14 @@ public class CassandraStreamReader implements IStreamReader StreamReceiver streamReceiver = session.getAggregator(tableId); Preconditions.checkState(streamReceiver instanceof CassandraStreamReceiver); - LifecycleNewTracker lifecycleNewTracker = CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).createLifecycleNewTracker(); + ILifecycleTransaction txn = createTxn(); + RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, false, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata())); + return new SSTableTxnSingleStreamWriter(txn, writer); + } - RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, false, format, sstableLevel, totalSize, lifecycleNewTracker, getHeader(cfs.metadata())); - return writer; + private ILifecycleTransaction createTxn() + { + return LifecycleTransaction.offline(OperationType.STREAM); } protected long totalSize() diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java index 480fcbe951..fc3e9044cf 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java @@ -35,10 +35,8 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.StreamingLifecycleTransaction; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -47,8 +45,7 @@ import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.ISSTableScanner; -import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.AccordTopology; @@ -80,7 +77,7 @@ public class CassandraStreamReceiver implements StreamReceiver private final StreamSession session; // Transaction tracking new files received - private final LifecycleTransaction txn; + private final StreamingLifecycleTransaction txn; // holds references to SSTables received protected final Collection<SSTableReader> sstables; @@ -98,7 +95,7 @@ public class CassandraStreamReceiver implements StreamReceiver this.session = session; // this is an "offline" transaction, as we currently manually expose the sstables once done; // this should be revisited at a later date, so that LifecycleTransaction manages all sstable state changes - this.txn = LifecycleTransaction.offline(OperationType.STREAM); + this.txn = new StreamingLifecycleTransaction(); this.ranges = ranges; this.sstables = new ArrayList<>(totalFiles); this.requiresWritePath = requiresWritePath(cfs); @@ -122,16 +119,16 @@ public class CassandraStreamReceiver implements StreamReceiver CassandraIncomingFile file = getFile(stream); Collection<SSTableReader> finished = null; - SSTableMultiWriter sstable = file.getSSTable(); + SSTableTxnSingleStreamWriter sstable = (SSTableTxnSingleStreamWriter)file.getSSTable(); try { - finished = sstable.finish(true); + finished = sstable.transferOwnershipTo(txn); } catch (Throwable t) { Throwables.maybeFail(sstable.abort(t)); } - txn.update(finished, false); + txn.update(finished); sstables.addAll(finished); receivedEntireSSTable = file.isEntireSSTable(); } @@ -143,39 +140,6 @@ public class CassandraStreamReceiver implements StreamReceiver Throwables.maybeFail(file.getSSTable().abort(null)); } - /** - * @return a LifecycleNewTracker whose operations are synchronised on this StreamReceiveTask. - */ - public synchronized LifecycleNewTracker createLifecycleNewTracker() - { - return new LifecycleNewTracker() - { - @Override - public void trackNew(SSTable table) - { - synchronized (CassandraStreamReceiver.this) - { - txn.trackNew(table); - } - } - - @Override - public void untrackNew(SSTable table) - { - synchronized (CassandraStreamReceiver.this) - { - txn.untrackNew(table); - } - } - - public OperationType opType() - { - return txn.opType(); - } - }; - } - - @Override public synchronized void abort() { diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java index f9e2265631..8ebbc4226d 100644 --- a/src/java/org/apache/cassandra/index/Index.java +++ b/src/java/org/apache/cassandra/index/Index.java @@ -47,7 +47,7 @@ import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.WriteContext; import org.apache.cassandra.db.filter.RowFilter; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.db.partitions.PartitionIterator; @@ -384,11 +384,11 @@ public interface Index * Get flush observer to observe partition/cell events generated by flushing SSTable (memtable flush or compaction). * * @param descriptor The descriptor of the sstable observer is requested for. - * @param tracker The {@link LifecycleNewTracker} associated with the SSTable being written + * @param txn The {@link ILifecycleTransaction} associated with the SSTable being written * * @return SSTable flush observer. */ - default SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker) + default SSTableFlushObserver getFlushObserver(Descriptor descriptor, ILifecycleTransaction txn) { return null; } @@ -822,12 +822,12 @@ public interface Index * Get flush observer to observe partition/cell events generated by flushing SSTable (memtable flush or compaction). * * @param descriptor The descriptor of the sstable observer is requested for. - * @param tracker The {@link LifecycleNewTracker} associated with the SSTable being written + * @param txn The {@link ILifecycleTransaction} associated with the SSTable being written * @param tableMetadata The immutable metadata of the table at the moment the SSTable is flushed * * @return SSTable flush observer. */ - SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata); + SSTableFlushObserver getFlushObserver(Descriptor descriptor, ILifecycleTransaction txn, TableMetadata tableMetadata); /** * @param type index transaction type diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java b/src/java/org/apache/cassandra/index/IndexRegistry.java index 5fa91ca092..8b14ef2c9e 100644 --- a/src/java/org/apache/cassandra/index/IndexRegistry.java +++ b/src/java/org/apache/cassandra/index/IndexRegistry.java @@ -35,7 +35,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.RowFilter; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.memtable.Memtable; @@ -250,7 +250,7 @@ public interface IndexRegistry extends Iterable<Index> @Nullable @Override - public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata) + public SSTableFlushObserver getFlushObserver(Descriptor descriptor, ILifecycleTransaction txn, TableMetadata tableMetadata) { return null; } diff --git a/src/java/org/apache/cassandra/index/SingletonIndexGroup.java b/src/java/org/apache/cassandra/index/SingletonIndexGroup.java index 162247fd74..501144f069 100644 --- a/src/java/org/apache/cassandra/index/SingletonIndexGroup.java +++ b/src/java/org/apache/cassandra/index/SingletonIndexGroup.java @@ -29,7 +29,7 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.WriteContext; import org.apache.cassandra.db.filter.RowFilter; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.index.transactions.IndexTransaction; import org.apache.cassandra.io.sstable.Component; @@ -88,9 +88,9 @@ public class SingletonIndexGroup implements Index.Group } @Override - public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata) + public SSTableFlushObserver getFlushObserver(Descriptor descriptor, ILifecycleTransaction txn, TableMetadata tableMetadata) { - return delegate.getFlushObserver(descriptor, tracker); + return delegate.getFlushObserver(descriptor, txn); } @Override diff --git a/src/java/org/apache/cassandra/index/accord/MemtableIndexManager.java b/src/java/org/apache/cassandra/index/accord/MemtableIndexManager.java index bdaebc2a94..54bcb238a6 100644 --- a/src/java/org/apache/cassandra/index/accord/MemtableIndexManager.java +++ b/src/java/org/apache/cassandra/index/accord/MemtableIndexManager.java @@ -22,7 +22,7 @@ import java.nio.ByteBuffer; import java.util.NavigableSet; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.schema.TableId; @@ -31,7 +31,7 @@ public interface MemtableIndexManager { long index(DecoratedKey key, Row row, Memtable mt); - MemtableIndex getPendingMemtableIndex(LifecycleNewTracker tracker); + MemtableIndex getPendingMemtableIndex(ILifecycleTransaction txn); void discardMemtable(Memtable memtable); diff --git a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java index aa97dd981e..acd014a5c4 100644 --- a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java +++ b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java @@ -49,7 +49,7 @@ import org.apache.cassandra.db.WriteContext; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.RowFilter; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.lifecycle.Tracker; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.Int32Type; @@ -317,17 +317,17 @@ public class RouteJournalIndex implements Index, INotificationConsumer @Override public SSTableFlushObserver getFlushObserver(Descriptor descriptor, - LifecycleNewTracker tracker) + ILifecycleTransaction txn) { // mimics org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat.newPerColumnIndexWriter IndexDescriptor id = IndexDescriptor.create(descriptor, baseCfs.getPartitioner(), baseCfs.metadata().comparator); - if (tracker.opType() != OperationType.FLUSH || !initBuildStarted) + if (txn.opType() != OperationType.FLUSH || !initBuildStarted) { return new RouteIndexFormat.SSTableIndexWriter(this, id); } else { - return new RouteIndexFormat.MemtableRouteIndexWriter(id, memtableIndexManager.getPendingMemtableIndex(tracker)); + return new RouteIndexFormat.MemtableRouteIndexWriter(id, memtableIndexManager.getPendingMemtableIndex(txn)); } } diff --git a/src/java/org/apache/cassandra/index/accord/RouteMemtableIndexManager.java b/src/java/org/apache/cassandra/index/accord/RouteMemtableIndexManager.java index f979ea5cfe..13159d1acb 100644 --- a/src/java/org/apache/cassandra/index/accord/RouteMemtableIndexManager.java +++ b/src/java/org/apache/cassandra/index/accord/RouteMemtableIndexManager.java @@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.schema.TableId; @@ -73,10 +73,10 @@ public class RouteMemtableIndexManager implements MemtableIndexManager } @Override - public MemtableIndex getPendingMemtableIndex(LifecycleNewTracker tracker) + public MemtableIndex getPendingMemtableIndex(ILifecycleTransaction txn) { return liveMemtableIndexMap.keySet().stream() - .filter(m -> tracker.equals(m.getFlushTransaction())) + .filter(m -> txn.equals(m.getFlushTransaction())) .findFirst() .map(liveMemtableIndexMap::get) .orElse(null); diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java index c17f9fb0a9..c93923562c 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java @@ -67,7 +67,7 @@ import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.guardrails.GuardrailViolatedException; import org.apache.cassandra.db.guardrails.Guardrails; import org.apache.cassandra.db.guardrails.MaxThreshold; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.FloatType; import org.apache.cassandra.db.memtable.Memtable; @@ -541,7 +541,7 @@ public class StorageAttachedIndex implements Index } @Override - public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker) + public SSTableFlushObserver getFlushObserver(Descriptor descriptor, ILifecycleTransaction txn) { // flush observers should be created from the index group, this is only used by the singleton index group throw new UnsupportedOperationException("Storage-attached index flush observers should never be created directly."); diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java index 02e7971814..28711e9abe 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java @@ -39,7 +39,7 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.WriteContext; import org.apache.cassandra.db.filter.RowFilter; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.lifecycle.Tracker; import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.db.rows.Row; @@ -200,12 +200,12 @@ public class StorageAttachedIndexGroup implements Index.Group, INotificationCons } @Override - public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata) + public SSTableFlushObserver getFlushObserver(Descriptor descriptor, ILifecycleTransaction txn, TableMetadata tableMetadata) { IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, tableMetadata.partitioner, tableMetadata.comparator); try { - return StorageAttachedIndexWriter.createFlushObserverWriter(indexDescriptor, indexes, tracker); + return StorageAttachedIndexWriter.createFlushObserverWriter(indexDescriptor, indexes, txn); } catch (Throwable t) { diff --git a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java index 341bcaac5e..ea1a7d69c7 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java @@ -29,7 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.tries.InMemoryTrie; @@ -63,29 +63,29 @@ public class StorageAttachedIndexWriter implements SSTableFlushObserver public static StorageAttachedIndexWriter createFlushObserverWriter(IndexDescriptor indexDescriptor, Collection<StorageAttachedIndex> indexes, - LifecycleNewTracker lifecycleNewTracker) throws IOException + ILifecycleTransaction txn) throws IOException { - return new StorageAttachedIndexWriter(indexDescriptor, indexes, lifecycleNewTracker, false); + return new StorageAttachedIndexWriter(indexDescriptor, indexes, txn, false); } public static StorageAttachedIndexWriter createBuilderWriter(IndexDescriptor indexDescriptor, Collection<StorageAttachedIndex> indexes, - LifecycleNewTracker lifecycleNewTracker, + ILifecycleTransaction txn, boolean perIndexComponentsOnly) throws IOException { - return new StorageAttachedIndexWriter(indexDescriptor, indexes, lifecycleNewTracker, perIndexComponentsOnly); + return new StorageAttachedIndexWriter(indexDescriptor, indexes, txn, perIndexComponentsOnly); } private StorageAttachedIndexWriter(IndexDescriptor indexDescriptor, Collection<StorageAttachedIndex> indexes, - LifecycleNewTracker lifecycleNewTracker, + ILifecycleTransaction txn, boolean perIndexComponentsOnly) throws IOException { this.indexDescriptor = indexDescriptor; - this.rowMapping = RowMapping.create(lifecycleNewTracker.opType()); + this.rowMapping = RowMapping.create(txn.opType()); this.perIndexWriters = indexes.stream().map(index -> indexDescriptor.newPerColumnIndexWriter(index, - lifecycleNewTracker, + txn, rowMapping)) .filter(Objects::nonNull) // a null here means the column had no data to flush .collect(Collectors.toList()); diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java index 4501aec7f9..976097b084 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java @@ -30,7 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ClusteringComparator; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.index.sai.StorageAttachedIndex; import org.apache.cassandra.index.sai.IndexValidation; @@ -135,10 +135,10 @@ public class IndexDescriptor } public PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex index, - LifecycleNewTracker tracker, + ILifecycleTransaction txn, RowMapping rowMapping) { - return version.onDiskFormat().newPerColumnIndexWriter(index, this, tracker, rowMapping); + return version.onDiskFormat().newPerColumnIndexWriter(index, this, txn, rowMapping); } public boolean isPerSSTableIndexBuildComplete() diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java index 30ba3b6295..a33a5f7837 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Set; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.index.sai.SSTableContext; import org.apache.cassandra.index.sai.StorageAttachedIndex; import org.apache.cassandra.index.sai.disk.PerColumnIndexWriter; @@ -80,19 +80,19 @@ public interface OnDiskFormat PerSSTableIndexWriter newPerSSTableIndexWriter(IndexDescriptor indexDescriptor) throws IOException; /** - * Create a new {@link PerColumnIndexWriter} to write the per-column on-disk components of an index. The {@link LifecycleNewTracker} + * Create a new {@link PerColumnIndexWriter} to write the per-column on-disk components of an index. The {@link ILifecycleTransaction} * is used to determine the type of index write about to happen this will either be an * {@code OperationType.FLUSH} indicating that we are about to flush a {@link org.apache.cassandra.index.sai.memory.MemtableIndex} * or one of the other operation types indicating that we will be writing from an existing SSTable * * @param index The {@link StorageAttachedIndex} holding the current index build status * @param indexDescriptor The {@link IndexDescriptor} for the SSTable - * @param tracker The {@link LifecycleNewTracker} for index build operation. + * @param txn The {@link ILifecycleTransaction} for index build operation. * @param rowMapping The {@link RowMapping} that is used to map rowID to {@code PrimaryKey} during the write operation */ PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex index, IndexDescriptor indexDescriptor, - LifecycleNewTracker tracker, + ILifecycleTransaction txn, RowMapping rowMapping); /** diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java index 8d8266ac34..d8f5d9e631 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java @@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory; import com.codahale.metrics.Gauge; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.index.sai.SSTableContext; import org.apache.cassandra.index.sai.StorageAttachedIndex; import org.apache.cassandra.index.sai.disk.PerColumnIndexWriter; @@ -151,11 +151,11 @@ public class V1OnDiskFormat implements OnDiskFormat @Override public PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex index, IndexDescriptor indexDescriptor, - LifecycleNewTracker tracker, + ILifecycleTransaction txn, RowMapping rowMapping) { // If we're not flushing, or we haven't yet started the initialization build, flush from SSTable contents. - if (tracker.opType() != OperationType.FLUSH || !index.isInitBuildStarted()) + if (txn.opType() != OperationType.FLUSH || !index.isInitBuildStarted()) { NamedMemoryLimiter limiter = SEGMENT_BUILD_MEMORY_LIMITER; logger.info(index.identifier().logMessage("Starting a compaction index build. Global segment memory usage: {}"), @@ -164,7 +164,7 @@ public class V1OnDiskFormat implements OnDiskFormat return new SSTableIndexWriter(indexDescriptor, index, limiter, index.isIndexValid()); } - return new MemtableIndexWriter(index.memtableIndexManager().getPendingMemtableIndex(tracker), + return new MemtableIndexWriter(index.memtableIndexManager().getPendingMemtableIndex(txn), indexDescriptor, index.termType(), index.identifier(), diff --git a/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java b/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java index abd62dbff1..99bff5a1e1 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java +++ b/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java @@ -32,7 +32,7 @@ import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.index.sai.StorageAttachedIndex; @@ -119,10 +119,10 @@ public class MemtableIndexManager } @Nullable - public MemtableIndex getPendingMemtableIndex(LifecycleNewTracker tracker) + public MemtableIndex getPendingMemtableIndex(ILifecycleTransaction txn) { return liveMemtableIndexMap.keySet().stream() - .filter(m -> tracker.equals(m.getFlushTransaction())) + .filter(m -> txn.equals(m.getFlushTransaction())) .findFirst() .map(liveMemtableIndexMap::get) .orElse(null); diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java index ccfed7f5c9..b2989cb8d8 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java @@ -43,7 +43,7 @@ import org.apache.cassandra.db.WriteContext; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.RowFilter; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.lifecycle.Tracker; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.memtable.Memtable; @@ -323,9 +323,9 @@ public class SASIIndex implements Index, INotificationConsumer return new SASIIndexSearcher(cfs, command, DatabaseDescriptor.getRangeRpcTimeout(MILLISECONDS)); } - public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker) + public SSTableFlushObserver getFlushObserver(Descriptor descriptor, ILifecycleTransaction txn) { - return newWriter(baseCfs.metadata().partitionKeyType, descriptor, Collections.singletonMap(index.getDefinition(), index), tracker.opType()); + return newWriter(baseCfs.metadata().partitionKeyType, descriptor, Collections.singletonMap(index.getDefinition(), index), txn.opType()); } public IndexBuildingSupport getBuildTaskSupport() diff --git a/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java index 422c6eaa6e..1b4fa8e8fe 100644 --- a/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java @@ -28,7 +28,7 @@ import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.DiskBoundaries; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.SerializationHeader; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -47,14 +47,13 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter private final boolean isTransient; private final SSTableFormat<?, ?> format; private final SerializationHeader header; - private final LifecycleNewTracker lifecycleNewTracker; + private final ILifecycleTransaction txn; private int currentIndex = -1; public final ColumnFamilyStore cfs; private final List<SSTableMultiWriter> finishedWriters = new ArrayList<>(); - private final List<SSTableReader> finishedReaders = new ArrayList<>(); private SSTableMultiWriter currentWriter = null; - public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, TimeUUID pendingRepair, boolean isTransient, SSTableFormat<?, ?> format, int sstableLevel, long totalSize, LifecycleNewTracker lifecycleNewTracker, SerializationHeader header) throws IOException + public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, TimeUUID pendingRepair, boolean isTransient, SSTableFormat<?, ?> format, int sstableLevel, long totalSize, ILifecycleTransaction txn, SerializationHeader header) throws IOException { DiskBoundaries db = cfs.getDiskBoundaries(); directories = db.directories; @@ -65,7 +64,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter this.pendingRepair = pendingRepair; this.isTransient = isTransient; this.format = format; - this.lifecycleNewTracker = lifecycleNewTracker; + this.txn = txn; this.header = header; boundaries = db.positions; if (boundaries == null) @@ -75,7 +74,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(localDir), format); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, isTransient, null, sstableLevel, header, lifecycleNewTracker); + currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, isTransient, null, sstableLevel, header, txn); } } @@ -97,7 +96,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter finishedWriters.add(currentWriter); Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex)), format); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, isTransient, null, sstableLevel, header, lifecycleNewTracker); + currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, isTransient, null, sstableLevel, header, txn); } } @@ -113,6 +112,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter if (currentWriter != null) finishedWriters.add(currentWriter); currentWriter = null; + List<SSTableReader> finishedReaders = new ArrayList<>(finishedWriters.size()); for (SSTableMultiWriter writer : finishedWriters) { if (writer.getBytesWritten() > 0) @@ -126,6 +126,12 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter @Override public Collection<SSTableReader> finished() { + List<SSTableReader> finishedReaders = new ArrayList<>(finishedWriters.size()); + for (SSTableMultiWriter writer : finishedWriters) + { + if (writer != null) + finishedReaders.addAll(writer.finished()); + } return finishedReaders; } diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java new file mode 100644 index 0000000000..390e63858f --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable; + +import java.util.Collection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; +import org.apache.cassandra.db.lifecycle.StreamingLifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.utils.Throwables; + +public class SSTableTxnSingleStreamWriter implements SSTableMultiWriter +{ + private static final Logger logger = LoggerFactory.getLogger(SSTableTxnSingleStreamWriter.class); + + private final ILifecycleTransaction txn; + private final SSTableMultiWriter writer; + private boolean complete = false; + + public SSTableTxnSingleStreamWriter(ILifecycleTransaction txn, SSTableMultiWriter writer) + { + this.txn = txn; + this.writer = writer; + } + + @Override + public void append(UnfilteredRowIterator partition) + { + writer.append(partition); + } + + public synchronized Collection<SSTableReader> transferOwnershipTo(StreamingLifecycleTransaction globalTxn) + { + failIfComplete(); + writer.setOpenResult(true); + writer.prepareToCommit(); + txn.prepareToCommit(); + globalTxn.takeOwnership(txn); + Throwables.maybeFail(writer.commit(txn.commit(null))); + complete = true; + return writer.finished(); + } + + @Override + public synchronized Throwable abort(Throwable accumulate) + { + if (complete) + { + logger.debug("Already completed writer for '{}'. Nothing to abort.", getFilename()); + return accumulate; + } + + complete = true; + return txn.abort(writer.abort(accumulate)); + } + + @Override + public synchronized void close() + { + complete = true; + writer.close(); + } + + private void failIfComplete() + { + if (complete) + throw new IllegalStateException("Writer "+getFilename()+" has already completed"); + } + + @Override + public String getFilename() + { + return writer.getFilename(); + } + + @Override + public long getBytesWritten() + { + return writer.getBytesWritten(); + } + + @Override + public long getOnDiskBytesWritten() + { + return writer.getOnDiskBytesWritten(); + } + + @Override + public TableId getTableId() + { + return writer.getTableId(); + } + + @Override + public Collection<SSTableReader> finish(boolean openResult) + { + throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter should be finished via transferOwnershipTo"); + } + + @Override + public Collection<SSTableReader> finished() + { + throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter should be finished via transferOwnershipTo"); + } + + @Override + public SSTableMultiWriter setOpenResult(boolean openResult) + { + throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter should be finished via transferOwnershipTo"); + } + + @Override + public void prepareToCommit() + { + throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter should be finished via transferOwnershipTo"); + } + + @Override + public Throwable commit(Throwable accumulate) + { + throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter should be finished via transferOwnershipTo"); + } + +} diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java index 46a490974e..47566ee550 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java @@ -31,7 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; @@ -56,12 +56,11 @@ public class SSTableZeroCopyWriter extends SSTable implements SSTableMultiWriter private final Map<String, ZeroCopySequentialWriter> componentWriters; // indexed by component name public SSTableZeroCopyWriter(Builder<?, ?> builder, - LifecycleNewTracker lifecycleNewTracker, + ILifecycleTransaction txn, SSTable.Owner owner) { super(builder, owner); - - lifecycleNewTracker.trackNew(this); + txn.trackNew(this); this.componentWriters = new HashMap<>(); Set<Component> unsupported = components.stream() diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java index 99406dba31..6c7b7b6d7f 100644 --- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java @@ -23,7 +23,7 @@ import java.util.Collections; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.commitlog.IntervalSet; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -36,11 +36,11 @@ import org.apache.cassandra.utils.TimeUUID; public class SimpleSSTableMultiWriter implements SSTableMultiWriter { private final SSTableWriter writer; - private final LifecycleNewTracker lifecycleNewTracker; + private final ILifecycleTransaction txn; - protected SimpleSSTableMultiWriter(SSTableWriter writer, LifecycleNewTracker lifecycleNewTracker) + protected SimpleSSTableMultiWriter(SSTableWriter writer, ILifecycleTransaction txn) { - this.lifecycleNewTracker = lifecycleNewTracker; + this.txn = txn; this.writer = writer; } @@ -92,7 +92,7 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter public Throwable abort(Throwable accumulate) { - lifecycleNewTracker.untrackNew(writer); + txn.untrackNew(writer); return writer.abort(accumulate); } @@ -116,7 +116,7 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter int sstableLevel, SerializationHeader header, Collection<Index.Group> indexGroups, - LifecycleNewTracker lifecycleNewTracker, + ILifecycleTransaction txn, SSTable.Owner owner) { MetadataCollector metadataCollector = new MetadataCollector(metadata.get().comparator) @@ -132,7 +132,7 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter .setSerializationHeader(header) .addDefaultComponents(indexGroups) .setSecondaryIndexGroups(indexGroups) - .build(lifecycleNewTracker, owner); - return new SimpleSSTableMultiWriter(writer, lifecycleNewTracker); + .build(txn, owner); + return new SimpleSSTableMultiWriter(writer, txn); } } diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java index 654880c2c1..6ffaa8a7ba 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java @@ -24,7 +24,6 @@ import javax.annotation.Nonnull; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.AbstractRowIndexEntry; @@ -137,7 +136,7 @@ public interface SSTableFormat<R extends SSTableReader, W extends SSTableWriter> /** * Returns a new builder which can create instance of {@link SSTableWriter} with the provided parameters. * Similarly to the loading builder, it should open the required resources when - * the {@link SSTableWriter.Builder#build(LifecycleNewTracker, SSTable.Owner)} method is called. + * the {@link SSTableWriter.Builder#build(org.apache.cassandra.db.lifecycle.ILifecycleTransaction, SSTable.Owner)} method is called. * It should not let the caller passing any closeable resources directly, that is, via setters. * If building fails, all the opened resources should be released. */ diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 38efd1955e..a3f24c0e5b 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.SerializationHeader; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; @@ -63,7 +63,7 @@ import static com.google.common.base.Preconditions.checkNotNull; /** * A root class for a writer implementation. A writer must be created by passing an implementation-specific - * {@link Builder}, a {@link LifecycleNewTracker} and {@link SSTable.Owner} instances. Implementing classes should + * {@link Builder}, a {@link ILifecycleTransaction} and {@link SSTable.Owner} instances. Implementing classes should * not extend that list and all the additional properties should be included in the builder. */ public abstract class SSTableWriter extends SSTable implements Transactional @@ -80,7 +80,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional protected final List<SSTableFlushObserver> observers; protected final MmappedRegionsCache mmappedRegionsCache; protected final TransactionalProxy txnProxy = txnProxy(); - protected final LifecycleNewTracker lifecycleNewTracker; + protected final ILifecycleTransaction txn; protected DecoratedKey first; protected DecoratedKey last; @@ -90,7 +90,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional */ protected abstract TransactionalProxy txnProxy(); - protected SSTableWriter(Builder<?, ?> builder, LifecycleNewTracker lifecycleNewTracker, SSTable.Owner owner) + protected SSTableWriter(Builder<?, ?> builder, ILifecycleTransaction txn, SSTable.Owner owner) { super(builder, owner); checkNotNull(builder.getIndexGroups()); @@ -104,7 +104,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional this.metadataCollector = builder.getMetadataCollector(); this.header = builder.getSerializationHeader(); this.mmappedRegionsCache = builder.getMmappedRegionsCache(); - this.lifecycleNewTracker = lifecycleNewTracker; + this.txn = txn; // We need to ensure that no sstable components exist before the lifecycle transaction starts tracking it. // Otherwise, it means that we either want to overwrite some existing sstable, which is not allowed, or some @@ -116,7 +116,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional descriptor.directory, existingComponents); - lifecycleNewTracker.trackNew(this); + txn.trackNew(this); try { @@ -124,7 +124,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional this.observers = Collections.unmodifiableList(observers); for (Index.Group group : builder.getIndexGroups()) { - SSTableFlushObserver observer = group.getFlushObserver(descriptor, lifecycleNewTracker, metadata.getLocal()); + SSTableFlushObserver observer = group.getFlushObserver(descriptor, txn, metadata.getLocal()); if (observer != null) { observer.begin(); @@ -146,7 +146,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional * The caught exception should be then rethrown so the {@link Builder} can handle it and close any resources opened * implicitly by the builder. * <p> - * See {@link SortedTableWriter#SortedTableWriter(SortedTableWriter.Builder, LifecycleNewTracker, Owner)} as of CASSANDRA-18737. + * See {@link SortedTableWriter#SortedTableWriter(SortedTableWriter.Builder, ILifecycleTransaction, Owner)} as of CASSANDRA-18737. * * @param ex the exception thrown during the construction */ @@ -156,7 +156,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional for (int i = observers.size()-1; i >= 0; i--) observers.get(i).abort(ex); descriptor.getFormat().deleteOrphanedComponents(descriptor, components); - lifecycleNewTracker.untrackNew(this); + txn.untrackNew(this); } @Override @@ -422,7 +422,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional /** * A builder of this sstable writer. It should be extended for each implementation with the specific fields. * - * An implementation should open all the resources when {@link #build(LifecycleNewTracker, Owner)} and pass them + * An implementation should open all the resources when {@link #build(ILifecycleTransaction, Owner)} and pass them * in builder fields to the writer, so that the writer can access them via getters. * * @param <W> type of the sstable writer to be build with this builder @@ -557,20 +557,20 @@ public abstract class SSTableWriter extends SSTable implements Transactional super(descriptor); } - public W build(LifecycleNewTracker lifecycleNewTracker, Owner owner) + public W build(ILifecycleTransaction txn, Owner owner) { checkNotNull(getComponents()); validateRepairedMetadata(getRepairedAt(), getPendingRepair(), isTransientSSTable()); - return buildInternal(lifecycleNewTracker, owner); + return buildInternal(txn, owner); } - protected abstract W buildInternal(LifecycleNewTracker lifecycleNewTracker, Owner owner); + protected abstract W buildInternal(ILifecycleTransaction txn, Owner owner); - public SSTableZeroCopyWriter createZeroCopyWriter(LifecycleNewTracker lifecycleNewTracker, Owner owner) + public SSTableZeroCopyWriter createZeroCopyWriter(ILifecycleTransaction txn, Owner owner) { - return new SSTableZeroCopyWriter(this, lifecycleNewTracker, owner); + return new SSTableZeroCopyWriter(this, txn, owner); } } } diff --git a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java index 6fba07ba1e..5ccaf26710 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java @@ -35,7 +35,7 @@ import org.apache.cassandra.db.DeletionPurger; import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.guardrails.Guardrails; import org.apache.cassandra.db.guardrails.Threshold; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.rows.ComplexColumnData; import org.apache.cassandra.db.rows.PartitionSerializationException; import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker; @@ -88,9 +88,9 @@ public abstract class SortedTableWriter<P extends SortedTablePartitionWriter, I private long lastEarlyOpenLength; private final Supplier<Double> crcCheckChanceSupplier; - public SortedTableWriter(Builder<P, I, ?, ?> builder, LifecycleNewTracker lifecycleNewTracker, SSTable.Owner owner) + public SortedTableWriter(Builder<P, I, ?, ?> builder, ILifecycleTransaction txn, SSTable.Owner owner) { - super(builder, lifecycleNewTracker, owner); + super(builder, txn, owner); TableMetadataRef ref = builder.getTableMetadataRef(); crcCheckChanceSupplier = () -> ref.getLocal().params.crcCheckChance; diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 1cd63e56b1..3233ca4c06 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -33,7 +33,6 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.index.Index; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.AbstractRowIndexEntry; @@ -74,16 +73,15 @@ public class BigTableWriter extends SortedTableWriter<BigFormatPartitionWriter, private final Map<DecoratedKey, AbstractRowIndexEntry> cachedKeys = new HashMap<>(); private final boolean shouldMigrateKeyCache; - public BigTableWriter(Builder builder, LifecycleNewTracker lifecycleNewTracker, SSTable.Owner owner) + public BigTableWriter(Builder builder, ILifecycleTransaction txn, SSTable.Owner owner) { - super(builder, lifecycleNewTracker, owner); + super(builder, txn, owner); this.rowIndexEntrySerializer = builder.getRowIndexEntrySerializer(); checkNotNull(this.rowIndexEntrySerializer); this.shouldMigrateKeyCache = DatabaseDescriptor.shouldMigrateKeycacheOnCompaction() - && lifecycleNewTracker instanceof ILifecycleTransaction - && !((ILifecycleTransaction) lifecycleNewTracker).isOffline(); + && !txn.isOffline(); } @Override @@ -114,7 +112,7 @@ public class BigTableWriter extends SortedTableWriter<BigFormatPartitionWriter, if (shouldMigrateKeyCache) { - for (SSTableReader reader : ((ILifecycleTransaction) lifecycleNewTracker).originals()) + for (SSTableReader reader : txn.originals()) { if (reader instanceof KeyCacheSupport<?> && ((KeyCacheSupport<?>) reader).getCachedPosition(key, false) != null) { @@ -431,14 +429,14 @@ public class BigTableWriter extends SortedTableWriter<BigFormatPartitionWriter, } @Override - protected BigTableWriter buildInternal(LifecycleNewTracker lifecycleNewTracker, Owner owner) + protected BigTableWriter buildInternal(ILifecycleTransaction txn, Owner owner) { try { - this.operationType = lifecycleNewTracker.opType(); + this.operationType = txn.opType(); this.mmappedRegionsCache = new MmappedRegionsCache(); this.rowIndexEntrySerializer = new RowIndexEntry.Serializer(descriptor.version, getSerializationHeader(), owner != null ? owner.getMetrics() : null); - return new BigTableWriter(this, lifecycleNewTracker, owner); + return new BigTableWriter(this, txn, owner); } catch (RuntimeException | Error ex) { diff --git a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java index 7aad38511f..074c5c1708 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java @@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.index.Index; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.FSWriteError; @@ -65,9 +65,9 @@ public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter, { private static final Logger logger = LoggerFactory.getLogger(BtiTableWriter.class); - public BtiTableWriter(Builder builder, LifecycleNewTracker lifecycleNewTracker, SSTable.Owner owner) + public BtiTableWriter(Builder builder, ILifecycleTransaction txn, SSTable.Owner owner) { - super(builder, lifecycleNewTracker, owner); + super(builder, txn, owner); } @Override @@ -371,14 +371,14 @@ public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter, } @Override - protected BtiTableWriter buildInternal(LifecycleNewTracker lifecycleNewTracker, Owner owner) + protected BtiTableWriter buildInternal(ILifecycleTransaction txn, Owner owner) { try { this.mmappedRegionsCache = new MmappedRegionsCache(); - this.operationType = lifecycleNewTracker.opType(); + this.operationType = txn.opType(); - return new BtiTableWriter(this, lifecycleNewTracker, owner); + return new BtiTableWriter(this, txn, owner); } catch (RuntimeException | Error ex) { diff --git a/src/java/org/apache/cassandra/streaming/IncomingStream.java b/src/java/org/apache/cassandra/streaming/IncomingStream.java index 25ab62630c..22141ef85f 100644 --- a/src/java/org/apache/cassandra/streaming/IncomingStream.java +++ b/src/java/org/apache/cassandra/streaming/IncomingStream.java @@ -36,7 +36,7 @@ public interface IncomingStream * Read in the stream data. */ void read(DataInputPlus inputPlus, int version) throws Throwable; - + default Throwable abort(Throwable t) { return t; } String getName(); long getSize(); int getNumFiles(); diff --git a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java index 2d05322543..a9ddabb689 100644 --- a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.guardrails.GuardrailViolatedException; import org.apache.cassandra.db.guardrails.Guardrails; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.streaming.messages.IncomingStreamMessage; import org.apache.cassandra.streaming.messages.KeepAliveMessage; import org.apache.cassandra.streaming.messages.StreamMessage; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -54,9 +55,9 @@ public class StreamDeserializingTask implements Runnable public void run() { StreamingDataInputPlus input = channel.in(); + StreamMessage message = null; try { - StreamMessage message; while (null != (message = StreamMessage.deserialize(input, messagingVersion))) { // keep-alives don't necessarily need to be tied to a session (they could be arrive before or after @@ -93,6 +94,8 @@ public class StreamDeserializingTask implements Runnable catch (Throwable t) { JVMStabilityInspector.inspectThrowable(t); + if ((session == null || session.isFailedOrAborted()) && message instanceof IncomingStreamMessage) + t = ((IncomingStreamMessage) message).stream.abort(t); if (session != null) { session.onError(t); diff --git a/test/distributed/org/apache/cassandra/distributed/test/streaming/IndexBuildFailsAfterStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/streaming/IndexBuildFailsAfterStreamingTest.java new file mode 100644 index 0000000000..767689f8cc --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/streaming/IndexBuildFailsAfterStreamingTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.streaming; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.index.SecondaryIndexManager; + +import static net.bytebuddy.implementation.MethodDelegation.to; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments; +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; +import static org.apache.cassandra.distributed.shared.AssertUtils.row; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class IndexBuildFailsAfterStreamingTest extends TestBaseImpl +{ + @Test + public void test() throws IOException + { + try (Cluster cluster = init(Cluster.build(2) + .withInstanceInitializer(BBHelper::install) + .withConfig(c -> c.with(Feature.values()) + .set("stream_entire_sstables", false) + .set("disk_failure_policy", "die")) + .start())) + { + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (p int, c int, v int, PRIMARY KEY(p, c))")); + cluster.schemaChange(withKeyspace("CREATE INDEX idx ON %s.tbl(v)")); + + for (int i = 0; i < 100; i++) + cluster.get(1).executeInternal(withKeyspace("insert into %s.tbl (p, c, v) values (?, ?, ?)"), i, i, i); + cluster.get(1).flush(KEYSPACE); + cluster.get(2).runOnInstance(() -> BBHelper.enabled.set(true)); + + // pre-existing weird behaivour - nodetool repair fails, but the sstables are actually streamed & live on node2: + cluster.get(2).runOnInstance(() -> assertTrue(Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().isEmpty())); + cluster.get(1).nodetoolResult("repair", KEYSPACE).asserts().failure(); + cluster.get(2).runOnInstance(() -> assertFalse(Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().isEmpty())); + for (int i = 0; i < 100; i++) + assertRows(cluster.get(2).executeInternal(withKeyspace("select * from %s.tbl where p = ? and c = ?"), i, i), row(i, i, i)); + + assertRows(cluster.get(1).executeInternal("select * from system.\"IndexInfo\" where table_name=? and index_name=?", KEYSPACE, "idx"), row(KEYSPACE, "idx", null)); + // index not built: + assertEquals(0, cluster.get(2).executeInternal("select * from system.\"IndexInfo\" where table_name=? and index_name=?", KEYSPACE, "idx").length); + } + } + + public static class BBHelper + { + public static void install(ClassLoader classLoader, int num) + { + if (num == 2) + { + new ByteBuddy().rebase(SecondaryIndexManager.class) + .method(named("calculateIndexingPageSize").and(takesNoArguments())) + .intercept(to(BBHelper.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + } + } + public static AtomicBoolean enabled = new AtomicBoolean(); + public static int calculateIndexingPageSize(@SuperCall Callable<Integer> zuper) throws Exception + { + if (enabled.get()) + throw new RuntimeException("On purpose fail 2i build"); + return zuper.call(); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterReceivingTest.java b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterReceivingTest.java new file mode 100644 index 0000000000..acffd1eee1 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterReceivingTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.streaming; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Test; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.utils.concurrent.CountDownLatch; + +import static net.bytebuddy.implementation.MethodDelegation.to; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +public class StreamFailedAfterReceivingTest extends TestBaseImpl +{ + @Test + public void zcsTest() throws IOException, ExecutionException, InterruptedException + { + leftoverFilesTest(true); + } + + @Test + public void nozcsTest() throws IOException, ExecutionException, InterruptedException + { + leftoverFilesTest(false); + } + + public void leftoverFilesTest(boolean zcs) throws IOException, ExecutionException, InterruptedException + { + try (Cluster cluster = Cluster.build(2) + .withInstanceInitializer(BBHelper::install) + .withConfig(c -> c.with(Feature.values()) + .set("stream_entire_sstables", zcs) + .set("autocompaction_on_startup_enabled", false) + .set("disk_failure_policy", "die")) + .start()) + { + init(cluster); + + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY, x int)")); + cluster.forEach(i -> i.nodetoolResult("disableautocompaction").asserts().success()); + IInvokableInstance node1 = cluster.get(1); + IInvokableInstance node2 = cluster.get(2); + for (int i = 1; i <= 1000000; i++) + { + node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, x) VALUES (?,?)"), i, i); + if (i % 100000 == 0) + node1.flush(KEYSPACE); + } + node1.flush(KEYSPACE); + node2.runOnInstance(() -> BBHelper.enabled.set(true)); + cluster.setUncaughtExceptionsFilter((e) -> e.getClass().getName().contains("TransactionAlreadyCompletedException")); + node1.nodetoolResult("repair", "-pr", "-full", KEYSPACE, "tbl").asserts().failure(); + node2.runOnInstance(() -> BBHelper.cdl.awaitUninterruptibly()); + node2.runOnInstance(() -> BBHelper.enabled.set(false)); + node2.shutdown().get(); + node2.startup(); + } + } + + + public static class BBHelper + { + public static void install(ClassLoader classLoader, Integer num) + { + if (num == 2) + { + // in this case we need to throw after trackNew:ing the sstable, but before it is finished + new ByteBuddy().rebase(LifecycleTransaction.class) + .method(named("trackNew").and(takesArguments(1))) + .intercept(to(StreamFailedAfterReceivingTest.BBHelper.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + } + } + + static AtomicInteger waiting = new AtomicInteger(); + static AtomicBoolean enabled = new AtomicBoolean(); + static CountDownLatch cdl = CountDownLatch.newCountDownLatch(1); + + public static void trackNew(SSTable sstable, @SuperCall Callable<Void> zuper) throws Exception + { + zuper.call(); + if (enabled.get()) + { + if (waiting.incrementAndGet() > 4) + throw new RuntimeException(); + + // using a sleep instead of a horrible nesting of latches - this should + // not make the test flaky, just might flakily pass without hitting the + // right condition + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + cdl.decrement(); + } + } + } + + @Test + public void basicStreamTest() throws IOException + { + try (Cluster cluster = init(Cluster.build(2) + .withConfig(c -> c.with(Feature.values()) + .set("stream_entire_sstables", false) + .set("autocompaction_on_startup_enabled", false) + .set("disk_failure_policy", "die")) + .start())) + { + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY, x int)")); + cluster.forEach(i -> i.nodetoolResult("disableautocompaction").asserts().success()); + IInvokableInstance node1 = cluster.get(1); + IInvokableInstance node2 = cluster.get(2); + for (int i = 1; i <= 1000; i++) + { + node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, x) VALUES (?,?)"), i, i); + if (i % 100 == 0) + node1.flush(KEYSPACE); + } + node1.flush(KEYSPACE); + + node1.nodetoolResult("repair", "-pr", "-full", KEYSPACE, "tbl").asserts().success(); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterTransferTest.java b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterTransferTest.java new file mode 100644 index 0000000000..b91cbf60a9 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterTransferTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.streaming; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Test; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.io.util.File; + +import static net.bytebuddy.implementation.MethodDelegation.to; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.junit.Assert.assertEquals; + +public class StreamFailedAfterTransferTest extends TestBaseImpl +{ + @Test + public void throwAfterTransferOwnership() throws IOException + { + try (Cluster cluster = Cluster.build(2) + .withInstanceInitializer(BBHelper::install) + .withConfig(c -> c.with(Feature.values()) + .set("stream_entire_sstables", false) + .set("disk_failure_policy", "die")) + .start()) + { + init(cluster); + + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY, x int) with compaction = {'class':'SizeTieredCompactionStrategy', 'enabled':'false'}")); + + IInvokableInstance node1 = cluster.get(1); + IInvokableInstance node2 = cluster.get(2); + for (int i = 1; i <= 1000; i++) + { + node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, x) VALUES (?,?)"), i, i); + if (i % 100 == 0) + node1.flush(KEYSPACE); + } + node1.flush(KEYSPACE); + cluster.setUncaughtExceptionsFilter((e) -> e.getClass().getName().contains("TransactionAlreadyCompletedException")); + node1.nodetoolResult("repair", "-pr", "-full", KEYSPACE, "tbl").asserts().failure(); + + node2.runOnInstance(() -> { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl"); + LifecycleTransaction.waitForDeletions(); + for (File f : cfs.getDirectories().getCFDirectories()) + { + try + { + int i = 0; + while (f.list().length > 0 && i++ < 20) + { + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + LifecycleTransaction.waitForDeletions(); + } + File [] files = f.list(); + assertEquals(Arrays.toString(files), 0, files.length); + + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + }); + } + } + + + + public static class BBHelper + { + public static void install(ClassLoader classLoader, Integer num) + { + if (num == 2) + { + // in this case we need to throw after trackNew:ing the sstable, but before it is finished + new ByteBuddy().rebase(LifecycleTransaction.class) + .method(named("takeOwnership").and(takesArguments(1))) + .intercept(to(BBHelper.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + } + } + + static AtomicInteger cnt = new AtomicInteger(); + public static void takeOwnership(ILifecycleTransaction txn, @SuperCall Callable<Void> zuper) throws Exception + { + zuper.call(); + if (cnt.incrementAndGet() > 3) + throw new RuntimeException(); + } + } +} diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java index fb3cccace2..53459a4af6 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java @@ -20,8 +20,10 @@ package org.apache.cassandra.db.lifecycle; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.util.concurrent.Uninterruptibles; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -33,6 +35,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState; import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState.Action; +import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.MockSchema; import org.apache.cassandra.utils.Pair; @@ -200,6 +203,54 @@ public class LifecycleTransactionTest extends AbstractTransactionalTest Assert.assertTrue(failed); } + @Test + public void testTransferAbort() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + List<SSTableReader> readers = readers(0, 4, cfs); + LifecycleTransaction sharedTxn = LifecycleTransaction.offline(OperationType.UNKNOWN); + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.UNKNOWN); + readers.forEach(txn::trackNew); + txn.prepareToCommit(); + sharedTxn.takeOwnership(txn); + txn.commit(); + sharedTxn.abort(); + assertFilesGone(readers); + } + + private void assertFilesGone(List<SSTableReader> readers) + { + readers.forEach(s -> { + int i = 0; + while (s.descriptor.fileFor(SSTableFormat.Components.DATA).exists() && i++ < 20) + { + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + LifecycleTransaction.waitForDeletions(); + } + Assert.assertFalse(s.descriptor.fileFor(SSTableFormat.Components.DATA).exists()); + }); + } + + @Test + public void testTransferAbortEarly() + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + List<SSTableReader> readers = readers(0, 4, cfs); + LifecycleTransaction sharedTxn = LifecycleTransaction.offline(OperationType.UNKNOWN); + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.UNKNOWN); + readers.forEach(txn::trackNew); + txn.prepareToCommit(); + txn.abort(); + try + { + sharedTxn.takeOwnership(txn); + Assert.fail("child txn is aborted, we should not take ownership"); + } + catch (Exception ignored) {} + + assertFilesGone(readers); + } + private static void testBadUpdate(LifecycleTransaction txn, SSTableReader update, boolean original) { boolean failed = false; diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java index abb6c2dc1b..a87878c70b 100644 --- a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java @@ -40,8 +40,9 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.lifecycle.StreamingLifecycleTransaction; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.locator.InetAddressAndPort; @@ -163,10 +164,11 @@ public class CassandraEntireSSTableStreamWriterTest CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), false, 0, 0, 0, null), header, session); - SSTableMultiWriter sstableWriter = reader.read(new DataInputBuffer(serializedFile.nioBuffer(), false)); - Collection<SSTableReader> newSstables = sstableWriter.finished(); - + SSTableTxnSingleStreamWriter sstableWriter = (SSTableTxnSingleStreamWriter) reader.read(new DataInputBuffer(serializedFile.nioBuffer(), false)); + StreamingLifecycleTransaction stt = new StreamingLifecycleTransaction(); + Collection<SSTableReader> newSstables = sstableWriter.transferOwnershipTo(stt); assertEquals(1, newSstables.size()); + stt.abort(); } } diff --git a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java index 55793c9cac..3f19da0ad7 100644 --- a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java +++ b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java @@ -52,10 +52,12 @@ import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.StreamingLifecycleTransaction; import org.apache.cassandra.dht.ByteOrderedPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter; import org.apache.cassandra.io.sstable.SSTableUtils; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.indexsummary.IndexSummaryManager; @@ -236,9 +238,11 @@ public class EntireSSTableStreamConcurrentComponentMutationTest { CassandraStreamHeader header = CassandraStreamHeader.serializer.deserialize(in, MessagingService.current_version); CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(messageHeader, header, session); - SSTableReader streamedSSTable = Iterables.getOnlyElement(reader.read(in).finished()); - + StreamingLifecycleTransaction stt = new StreamingLifecycleTransaction(); + SSTableTxnSingleStreamWriter writer = (SSTableTxnSingleStreamWriter) reader.read(in); + SSTableReader streamedSSTable = Iterables.getOnlyElement(writer.transferOwnershipTo(stt)); SSTableUtils.assertContentEquals(sstable, streamedSSTable); + stt.abort(); } } diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java index 483d109ec5..d6ce95a90e 100644 --- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java +++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java @@ -56,7 +56,6 @@ import org.apache.cassandra.cql3.restrictions.StatementRestrictions; import org.apache.cassandra.cql3.statements.ModificationStatement; import org.apache.cassandra.db.ColumnFamilyStore.FlushReason; import org.apache.cassandra.db.filter.RowFilter; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.cql3.statements.schema.IndexTarget; import org.apache.cassandra.db.CassandraWriteContext; import org.apache.cassandra.db.ColumnFamilyStore; @@ -68,6 +67,7 @@ import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.WriteContext; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UTF8Type; @@ -1287,7 +1287,7 @@ public class CustomIndexTest extends CQLTester } @Override - public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker) + public SSTableFlushObserver getFlushObserver(Descriptor descriptor, ILifecycleTransaction txn) { return new SSTableFlushObserver() { @@ -1672,11 +1672,11 @@ public class CustomIndexTest extends CQLTester } @Override - public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata) + public SSTableFlushObserver getFlushObserver(Descriptor descriptor, ILifecycleTransaction txn, TableMetadata tableMetadata) { Set<SSTableFlushObserver> observers = indexes.values() .stream() - .map(i -> i.getFlushObserver(descriptor, tracker)) + .map(i -> i.getFlushObserver(descriptor, txn)) .filter(Objects::nonNull) .collect(Collectors.toSet()); diff --git a/test/unit/org/apache/cassandra/index/StubIndexGroup.java b/test/unit/org/apache/cassandra/index/StubIndexGroup.java index 22dfbe262b..ec7f457d34 100644 --- a/test/unit/org/apache/cassandra/index/StubIndexGroup.java +++ b/test/unit/org/apache/cassandra/index/StubIndexGroup.java @@ -28,7 +28,7 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.WriteContext; import org.apache.cassandra.db.filter.RowFilter; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.index.transactions.IndexTransaction; import org.apache.cassandra.io.sstable.Component; @@ -93,7 +93,7 @@ public class StubIndexGroup implements Index.Group } @Override - public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata) + public SSTableFlushObserver getFlushObserver(Descriptor descriptor, ILifecycleTransaction txn, TableMetadata tableMetadata) { return null; } diff --git a/test/unit/org/apache/cassandra/io/sstable/ScrubTest.java b/test/unit/org/apache/cassandra/io/sstable/ScrubTest.java index 323cd01fbb..bd8f846572 100644 --- a/test/unit/org/apache/cassandra/io/sstable/ScrubTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/ScrubTest.java @@ -69,7 +69,7 @@ import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UUIDType; @@ -847,9 +847,9 @@ public class ScrubTest private static class TestMultiWriter extends SimpleSSTableMultiWriter { - TestMultiWriter(SSTableWriter writer, LifecycleNewTracker lifecycleNewTracker) + TestMultiWriter(SSTableWriter writer, ILifecycleTransaction txn) { - super(writer, lifecycleNewTracker); + super(writer, txn); } } diff --git a/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java b/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java index 0a854c0744..c156048276 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java @@ -51,6 +51,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.SSTableSimpleIterator; +import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.Version; @@ -499,7 +500,7 @@ public class StreamReaderTest super(header, streamHeader, session); } - protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, TimeUUID pendingRepair, SSTableFormat<?,?> format) throws IOException + protected SSTableTxnSingleStreamWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, TimeUUID pendingRepair, SSTableFormat<?,?> format) throws IOException { return super.createWriter(cfs, totalSize, repairedAt, pendingRepair, format); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org