This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f6c1002e44 Stream individual files in their own transactions and hand 
over ownership to a parent transaction on completion
f6c1002e44 is described below

commit f6c1002e44ed73fbcf940242217f5d20e0bf2d7d
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Wed Jun 11 08:08:59 2025 +0200

    Stream individual files in their own transactions and hand over ownership 
to a parent transaction on completion
    
    patch by Marcus Eriksson; reviewed by Caleb Rackliffe and Jon Meredith for 
CASSANDRA-20728
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  77 ++++++++---
 .../db/compaction/AbstractCompactionStrategy.java  |   6 +-
 .../db/compaction/AbstractStrategyHolder.java      |   4 +-
 .../db/compaction/CompactionStrategyHolder.java    |   6 +-
 .../db/compaction/CompactionStrategyManager.java   |   6 +-
 .../db/compaction/PendingRepairHolder.java         |   6 +-
 .../db/compaction/UnifiedCompactionStrategy.java   |   6 +-
 .../db/compaction/unified/ShardedMultiWriter.java  |  12 +-
 .../db/lifecycle/ILifecycleTransaction.java        |   7 +-
 .../db/lifecycle/LifecycleNewTracker.java          |  47 -------
 .../db/lifecycle/LifecycleTransaction.java         |  18 +--
 .../org/apache/cassandra/db/lifecycle/LogFile.java |  15 ++
 .../cassandra/db/lifecycle/LogTransaction.java     |  12 ++
 .../lifecycle/StreamingLifecycleTransaction.java   |  68 +++++++++
 .../CassandraEntireSSTableStreamReader.java        |  50 +++++--
 .../db/streaming/CassandraIncomingFile.java        |  12 +-
 .../db/streaming/CassandraStreamReader.java        |  17 ++-
 .../db/streaming/CassandraStreamReceiver.java      |  50 +------
 src/java/org/apache/cassandra/index/Index.java     |  10 +-
 .../org/apache/cassandra/index/IndexRegistry.java  |   4 +-
 .../cassandra/index/SingletonIndexGroup.java       |   6 +-
 .../index/accord/MemtableIndexManager.java         |   4 +-
 .../cassandra/index/accord/RouteJournalIndex.java  |   8 +-
 .../index/accord/RouteMemtableIndexManager.java    |   6 +-
 .../cassandra/index/sai/StorageAttachedIndex.java  |   4 +-
 .../index/sai/StorageAttachedIndexGroup.java       |   6 +-
 .../index/sai/disk/StorageAttachedIndexWriter.java |  16 +--
 .../index/sai/disk/format/IndexDescriptor.java     |   6 +-
 .../index/sai/disk/format/OnDiskFormat.java        |   8 +-
 .../index/sai/disk/v1/V1OnDiskFormat.java          |   8 +-
 .../index/sai/memory/MemtableIndexManager.java     |   6 +-
 .../org/apache/cassandra/index/sasi/SASIIndex.java |   6 +-
 .../io/sstable/RangeAwareSSTableWriter.java        |  20 ++-
 .../io/sstable/SSTableTxnSingleStreamWriter.java   | 145 +++++++++++++++++++
 .../io/sstable/SSTableZeroCopyWriter.java          |   7 +-
 .../io/sstable/SimpleSSTableMultiWriter.java       |  16 +--
 .../cassandra/io/sstable/format/SSTableFormat.java |   3 +-
 .../cassandra/io/sstable/format/SSTableWriter.java |  30 ++--
 .../io/sstable/format/SortedTableWriter.java       |   6 +-
 .../io/sstable/format/big/BigTableWriter.java      |  16 +--
 .../io/sstable/format/bti/BtiTableWriter.java      |  12 +-
 .../apache/cassandra/streaming/IncomingStream.java |   2 +-
 .../streaming/StreamDeserializingTask.java         |   5 +-
 .../IndexBuildFailsAfterStreamingTest.java         |  99 +++++++++++++
 .../streaming/StreamFailedAfterReceivingTest.java  | 154 +++++++++++++++++++++
 .../streaming/StreamFailedAfterTransferTest.java   | 127 +++++++++++++++++
 .../db/lifecycle/LifecycleTransactionTest.java     |  51 +++++++
 .../CassandraEntireSSTableStreamWriterTest.java    |  10 +-
 ...TableStreamConcurrentComponentMutationTest.java |   8 +-
 .../apache/cassandra/index/CustomIndexTest.java    |   8 +-
 .../org/apache/cassandra/index/StubIndexGroup.java |   4 +-
 .../org/apache/cassandra/io/sstable/ScrubTest.java |   6 +-
 .../cassandra/streaming/StreamReaderTest.java      |   3 +-
 54 files changed, 970 insertions(+), 280 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6fb94d9626..1efac73e11 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Stream individual files in their own transactions and hand over ownership 
to a parent transaction on completion (CASSANDRA-20728)
  * Limit the number of held heap dumps to not consume disk space excessively 
(CASSANDRA-20457)
  * Accord: BEGIN TRANSACTIONs IF condition logic does not properly support 
meaningless emptiness and null values (CASSANDRA-20667)
  * Accord: startup race condition where accord journal tries to access the 2i 
index before its ready (CASSANDRA-20686)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index e81c0d7be8..a499bc42f2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -85,7 +85,7 @@ import 
org.apache.cassandra.db.compaction.CompactionStrategyManager;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.DataLimits;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.Tracker;
@@ -652,19 +652,19 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
         return memtableFactory.streamFromMemtable();
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, 
SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, 
SerializationHeader header, ILifecycleTransaction txn)
     {
-        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, null, 0, header, lifecycleNewTracker);
+        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, null, 0, header, txn);
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, 
IntervalSet<CommitLogPosition> commitLogPositions, SerializationHeader header, 
LifecycleNewTracker lifecycleNewTracker)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, 
IntervalSet<CommitLogPosition> commitLogPositions, SerializationHeader header, 
ILifecycleTransaction txn)
     {
-        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, commitLogPositions, 0, header, lifecycleNewTracker);
+        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, commitLogPositions, 0, header, txn);
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, 
IntervalSet<CommitLogPosition> commitLogPositions, int sstableLevel, 
SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient, 
IntervalSet<CommitLogPosition> commitLogPositions, int sstableLevel, 
SerializationHeader header, ILifecycleTransaction txn)
     {
-        return 
getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, pendingRepair, isTransient, commitLogPositions, sstableLevel, 
header, indexManager.listIndexGroups(), lifecycleNewTracker);
+        return 
getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, pendingRepair, isTransient, commitLogPositions, sstableLevel, 
header, indexManager.listIndexGroups(), txn);
     }
 
     public boolean supportsEarlyOpen()
@@ -2410,22 +2410,67 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
         }
     }
 
-    private static final LifecycleNewTracker DO_NOT_TRACK = new 
LifecycleNewTracker()
+    private static final ILifecycleTransaction DO_NOT_TRACK = new 
ILifecycleTransaction()
     {
-        public void trackNew(SSTable table)
+        @Override
+        public void trackNew(SSTable sstable)
         {
-            // not tracking
-        }
 
-        public void untrackNew(SSTable table)
-        {
-            // not tracking
         }
 
-        public OperationType opType()
+        @Override
+        public void untrackNew(SSTable sstable)
         {
-            return OperationType.FLUSH;
+
         }
+
+        @Override
+        public OperationType opType() {return null;}
+
+        @Override
+        public void checkpoint() {}
+
+        @Override
+        public void update(SSTableReader reader, boolean original) {}
+
+        @Override
+        public void update(Collection<SSTableReader> readers, boolean 
original) {}
+
+        @Override
+        public SSTableReader current(SSTableReader reader) {return null;}
+
+        @Override
+        public void obsolete(SSTableReader reader) {}
+
+        @Override
+        public void obsoleteOriginals() {}
+
+        @Override
+        public Set<SSTableReader> originals() {return Set.of();}
+
+        @Override
+        public boolean isObsolete(SSTableReader reader) {return false;}
+
+        @Override
+        public boolean isOffline() {return false;}
+
+        @Override
+        public TimeUUID opId() {return null;}
+
+        @Override
+        public void cancel(SSTableReader removedSSTable) {}
+
+        @Override
+        public Throwable commit(Throwable accumulate) {return null;}
+
+        @Override
+        public Throwable abort(Throwable accumulate) {return null;}
+
+        @Override
+        public void prepareToCommit() {}
+
+        @Override
+        public void close() {}
     };
 
     /**
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 99a509c5f2..def444f9a9 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -36,7 +36,7 @@ import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -562,7 +562,7 @@ public abstract class AbstractCompactionStrategy
                                                        int sstableLevel,
                                                        SerializationHeader 
header,
                                                        Collection<Index.Group> 
indexGroups,
-                                                       LifecycleNewTracker 
lifecycleNewTracker)
+                                                       ILifecycleTransaction 
txn)
     {
         return SimpleSSTableMultiWriter.create(descriptor,
                                                keyCount,
@@ -574,7 +574,7 @@ public abstract class AbstractCompactionStrategy
                                                sstableLevel,
                                                header,
                                                indexGroups,
-                                               lifecycleNewTracker, cfs);
+                                               txn, cfs);
     }
 
     public boolean supportsEarlyOpen()
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
index 2db7e1db60..4bc7146f83 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.index.Index;
@@ -200,7 +200,7 @@ public abstract class AbstractStrategyHolder
                                                                 int 
sstableLevel,
                                                                 
SerializationHeader header,
                                                                 
Collection<Index.Group> indexGroups,
-                                                                
LifecycleNewTracker lifecycleNewTracker);
+                                                                
ILifecycleTransaction txn);
 
     /**
      * Return the directory index the given compaction strategy belongs to, or 
-1
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
index 9bc4d6d7cb..af5c749929 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.index.Index;
@@ -230,7 +230,7 @@ public class CompactionStrategyHolder extends 
AbstractStrategyHolder
                                                        int sstableLevel,
                                                        SerializationHeader 
header,
                                                        Collection<Index.Group> 
indexGroups,
-                                                       LifecycleNewTracker 
lifecycleNewTracker)
+                                                       ILifecycleTransaction 
txn)
     {
         if (isRepaired)
         {
@@ -255,7 +255,7 @@ public class CompactionStrategyHolder extends 
AbstractStrategyHolder
                                                  sstableLevel,
                                                  header,
                                                  indexGroups,
-                                                 lifecycleNewTracker);
+                                                 txn);
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index dfb908425a..b8eaa5bd81 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -54,7 +54,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
 import org.apache.cassandra.db.compaction.AbstractStrategyHolder.TaskSupplier;
 import org.apache.cassandra.db.compaction.PendingRepairManager.CleanupTask;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.dht.Range;
@@ -1257,7 +1257,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                                                        int sstableLevel,
                                                        SerializationHeader 
header,
                                                        Collection<Index.Group> 
indexGroups,
-                                                       LifecycleNewTracker 
lifecycleNewTracker)
+                                                       ILifecycleTransaction 
txn)
     {
         SSTable.validateRepairedMetadata(repairedAt, pendingRepair, 
isTransient);
         maybeReloadDiskBoundaries();
@@ -1273,7 +1273,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
                                                                                
               sstableLevel,
                                                                                
               header,
                                                                                
               indexGroups,
-                                                                               
               lifecycleNewTracker);
+                                                                               
               txn);
         }
         finally
         {
diff --git 
a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java 
b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
index 1f2ff4a1a1..45e8753394 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.index.Index;
@@ -250,7 +250,7 @@ public class PendingRepairHolder extends 
AbstractStrategyHolder
                                                        int sstableLevel,
                                                        SerializationHeader 
header,
                                                        Collection<Index.Group> 
indexGroups,
-                                                       LifecycleNewTracker 
lifecycleNewTracker)
+                                                       ILifecycleTransaction 
txn)
     {
         Preconditions.checkArgument(repairedAt == 
ActiveRepairService.UNREPAIRED_SSTABLE,
                                     "PendingRepairHolder can't create 
sstablewriter with repaired at set");
@@ -267,7 +267,7 @@ public class PendingRepairHolder extends 
AbstractStrategyHolder
                                                  sstableLevel,
                                                  header,
                                                  indexGroups,
-                                                 lifecycleNewTracker);
+                                                 txn);
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java
index c23e600e7b..d44a9f3288 100644
--- a/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java
@@ -46,7 +46,7 @@ import org.apache.cassandra.db.compaction.unified.Controller;
 import org.apache.cassandra.db.compaction.unified.ShardedMultiWriter;
 import org.apache.cassandra.db.compaction.unified.UnifiedCompactionTask;
 import org.apache.cassandra.db.lifecycle.CompositeLifecycleTransaction;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.PartialLifecycleTransaction;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -303,7 +303,7 @@ public class UnifiedCompactionStrategy extends 
AbstractCompactionStrategy
                                                        int sstableLevel,
                                                        SerializationHeader 
header,
                                                        Collection<Index.Group> 
indexGroups,
-                                                       LifecycleNewTracker 
lifecycleNewTracker)
+                                                       ILifecycleTransaction 
txn)
     {
         ShardManager shardManager = getShardManager();
         double flushDensity = cfs.metric.flushSizeOnDisk.get() * 
shardManager.shardSetCoverage() / shardManager.localSpaceCoverage();
@@ -317,7 +317,7 @@ public class UnifiedCompactionStrategy extends 
AbstractCompactionStrategy
                                       commitLogPositions,
                                       header,
                                       indexGroups,
-                                      lifecycleNewTracker,
+                                      txn,
                                       boundaries);
     }
 
diff --git 
a/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java 
b/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java
index a5b5df9e49..ab5465df25 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
 import org.apache.cassandra.db.compaction.ShardTracker;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -64,7 +64,7 @@ public class ShardedMultiWriter implements SSTableMultiWriter
     private final IntervalSet<CommitLogPosition> commitLogPositions;
     private final SerializationHeader header;
     private final Collection<Index.Group> indexGroups;
-    private final LifecycleNewTracker lifecycleNewTracker;
+    private final ILifecycleTransaction txn;
     private final ShardTracker boundaries;
     private final SSTableWriter[] writers;
     private int currentWriter;
@@ -78,7 +78,7 @@ public class ShardedMultiWriter implements SSTableMultiWriter
                               IntervalSet<CommitLogPosition> 
commitLogPositions,
                               SerializationHeader header,
                               Collection<Index.Group> indexGroups,
-                              LifecycleNewTracker lifecycleNewTracker,
+                              ILifecycleTransaction txn,
                               ShardTracker boundaries)
     {
         this.cfs = cfs;
@@ -90,7 +90,7 @@ public class ShardedMultiWriter implements SSTableMultiWriter
         this.commitLogPositions = commitLogPositions;
         this.header = header;
         this.indexGroups = indexGroups;
-        this.lifecycleNewTracker = lifecycleNewTracker;
+        this.txn = txn;
         this.boundaries = boundaries;
         this.writers = new SSTableWriter[this.boundaries.count()]; // at least 
one
 
@@ -118,7 +118,7 @@ public class ShardedMultiWriter implements 
SSTableMultiWriter
                          .setSerializationHeader(header)
                          .addDefaultComponents(indexGroups)
                          .setSecondaryIndexGroups(indexGroups)
-                         .build(lifecycleNewTracker, cfs);
+                         .build(txn, cfs);
     }
 
     private long forSplittingKeysBy(long splits) {
@@ -227,7 +227,7 @@ public class ShardedMultiWriter implements 
SSTableMultiWriter
         for (SSTableWriter writer : writers)
             if (writer != null)
             {
-                lifecycleNewTracker.untrackNew(writer);
+                txn.untrackNew(writer);
                 t = writer.abort(t);
             }
         return t;
diff --git 
a/src/java/org/apache/cassandra/db/lifecycle/ILifecycleTransaction.java 
b/src/java/org/apache/cassandra/db/lifecycle/ILifecycleTransaction.java
index f3e9c9b7c5..5da8447917 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/ILifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/ILifecycleTransaction.java
@@ -23,13 +23,18 @@ import java.util.Set;
 
 import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.TimeUUID;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
-public interface ILifecycleTransaction extends Transactional, 
LifecycleNewTracker
+public interface ILifecycleTransaction extends Transactional
 {
+    void trackNew(SSTable sstable);
+    void untrackNew(SSTable sstable);
+    OperationType opType();
     void checkpoint();
     void update(SSTableReader reader, boolean original);
     void update(Collection<SSTableReader> readers, boolean original);
diff --git 
a/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java 
b/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java
deleted file mode 100644
index 9a0785c43f..0000000000
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.cassandra.db.lifecycle;
-
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.io.sstable.SSTable;
-
-/**
- * An interface for tracking new sstables added to a LifecycleTransaction, 
possibly through some proxy.
- */
-public interface LifecycleNewTracker
-{
-    /**
-     * Called when a new table is about to be created, so that this table can 
be tracked by a transaction.
-     * @param table - the new table to be tracked
-     */
-    void trackNew(SSTable table);
-
-
-    /**
-     * Called when a new table is no longer required, so that this table can 
be untracked by a transaction.
-     * @param table - the table to be untracked
-     */
-    void untrackNew(SSTable table);
-
-    /**
-     * @return the type of operation tracking these sstables
-     */
-    OperationType opType();
-}
diff --git 
a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java 
b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 15b0417b38..48b0bc1ffb 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -205,12 +205,7 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional im
         }
     }
 
-    public LogTransaction log()
-    {
-        return log;
-    }
-
-    @Override //LifecycleNewTracker
+    @Override
     public OperationType opType()
     {
         return log.type();
@@ -322,6 +317,14 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional im
         return accumulate;
     }
 
+    void takeOwnership(ILifecycleTransaction txn)
+    {
+        LifecycleTransaction ltn = (LifecycleTransaction)txn;
+        if (!ltn.obsoletions.isEmpty() || !ltn.originals.isEmpty() || 
!ltn.logged.isEmpty())
+            throw new IllegalStateException("takeOwnership is only supported 
in add-only transactions (streams)");
+        log.takeOwnership(ltn.log);
+    }
+
     private Throwable runOnCommitHooks(Throwable accumulate)
     {
         return runHooks(commitHooks, accumulate);
@@ -646,8 +649,6 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional im
         return getFirst(originals, null);
     }
 
-    // LifecycleNewTracker
-
     @Override
     public void trackNew(SSTable table)
     {
@@ -659,7 +660,6 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional im
     {
         log.untrackNew(table);
     }
-
     public static boolean removeUnfinishedLeftovers(ColumnFamilyStore cfs)
     {
         return 
LogTransaction.removeUnfinishedLeftovers(cfs.getDirectories().getCFDirectories());
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index 13436b112a..08ba257f16 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -552,4 +552,19 @@ final class LogFile implements AutoCloseable
     {
         return records.isEmpty();
     }
+
+    public void takeOwnership(LogFile txnFile)
+    {
+        if (completed)
+            throw TransactionAlreadyCompletedException.create(getFiles());
+        for (LogRecord record : txnFile.records)
+        {
+            if (record.type != Type.ADD)
+                throw new IllegalStateException("Can only transfer ADD records 
- not " + record + " - " + txnFile.records);
+            File directory = new File(record.absolutePath.get()).parent();
+            String fileName = StringUtils.join(directory, 
File.pathSeparator(), getFileName());
+            replicas.maybeCreateReplica(directory, fileName, onDiskRecords);
+            addRecord(record);
+        }
+    }
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index 92766de385..b675ad238b 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@ -209,6 +209,18 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
         }
     }
 
+    void takeOwnership(LogTransaction log)
+    {
+        synchronized (lock)
+        {
+            if (state() != State.IN_PROGRESS)
+                throw new IllegalStateException("The LogTransaction getting 
ownership should be IN_PROGRESS, not " + state());
+            if (log.state() != State.READY_TO_COMMIT)
+                throw new IllegalStateException("The LogTransaction giving up 
its ownership should be READY_TO_COMMIT, not " + log.state());
+            txnFile.takeOwnership(log.txnFile);
+        }
+    }
+
     Map<SSTable, LogRecord> makeRemoveRecords(Iterable<SSTableReader> sstables)
     {
         synchronized (lock)
diff --git 
a/src/java/org/apache/cassandra/db/lifecycle/StreamingLifecycleTransaction.java 
b/src/java/org/apache/cassandra/db/lifecycle/StreamingLifecycleTransaction.java
new file mode 100644
index 0000000000..38145b6132
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/lifecycle/StreamingLifecycleTransaction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.Collection;
+
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+
+/**
+ * Special, restricted LifecycleTransaction for streaming, synchronizes access 
to the shared transaction
+ * and adds a method to take ownership of a "child-transaction".
+ *
+ * Each incoming file is now its own normal LifecycleTransaction.
+ */
+public class StreamingLifecycleTransaction
+{
+    private final LifecycleTransaction sharedTxn;
+
+    public StreamingLifecycleTransaction()
+    {
+        this.sharedTxn = LifecycleTransaction.offline(OperationType.STREAM);
+    }
+
+    public synchronized Throwable commit(Throwable accumulate)
+    {
+        return sharedTxn.commit(accumulate);
+    }
+
+    public synchronized void update(Collection<SSTableReader> readers)
+    {
+        sharedTxn.update(readers, false);
+    }
+
+    public synchronized void abort()
+    {
+        maybeFail(sharedTxn.abort(null));
+    }
+
+    public synchronized void finish()
+    {
+        sharedTxn.prepareToCommit();
+        sharedTxn.commit();
+    }
+
+    public synchronized void takeOwnership(ILifecycleTransaction txn)
+    {
+        sharedTxn.takeOwnership(txn);
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
index 97c3b2d4f9..f2f1784a9c 100644
--- 
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db.streaming;
 
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
 import java.util.Collection;
 import java.util.function.UnaryOperator;
 
@@ -28,12 +29,15 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.IOOptions;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
 import org.apache.cassandra.io.sstable.SSTableZeroCopyWriter;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -102,7 +106,7 @@ public class CassandraEntireSSTableStreamReader implements 
IStreamReader
                      prettyPrintMemory(totalSize),
                      cfs.metadata());
 
-        SSTableZeroCopyWriter writer = null;
+        SSTableTxnZeroCopyWriter writer = null;
 
         try
         {
@@ -121,7 +125,7 @@ public class CassandraEntireSSTableStreamReader implements 
IStreamReader
                              prettyPrintMemory(totalSize));
 
                 writer.writeComponent(component, in, length);
-                
session.progress(writer.descriptor.fileFor(component).toString(), 
ProgressInfo.Direction.IN, length, length, length);
+                
session.progress(writer.descriptor().fileFor(component).toString(), 
ProgressInfo.Direction.IN, length, length, length);
                 bytesRead += length;
 
                 logger.debug("[Stream #{}] Finished receiving {} component 
from {}, componentSize = {}, readBytes = {}, totalSize = {}",
@@ -137,7 +141,7 @@ public class CassandraEntireSSTableStreamReader implements 
IStreamReader
                                                                    
.mutateRepairedMetadata(messageHeader.repairedAt, messageHeader.pendingRepair, 
false);
             String description = String.format("level %s and repairedAt time 
%s and pendingRepair %s",
                                                header.sstableLevel, 
messageHeader.repairedAt, messageHeader.pendingRepair);
-            
writer.descriptor.getMetadataSerializer().mutate(writer.descriptor, 
description, transform);
+            
writer.descriptor().getMetadataSerializer().mutate(writer.descriptor(), 
description, transform);
             return writer;
         }
         catch (Throwable e)
@@ -167,14 +171,14 @@ public class CassandraEntireSSTableStreamReader 
implements IStreamReader
         return dir;
     }
 
-    protected SSTableZeroCopyWriter createWriter(ColumnFamilyStore cfs, long 
totalSize, Collection<Component> components) throws IOException
+    protected SSTableTxnZeroCopyWriter createWriter(ColumnFamilyStore cfs, 
long totalSize, Collection<Component> components) throws IOException
     {
         File dataDir = getDataDir(cfs, totalSize);
 
         StreamReceiver streamReceiver = session.getAggregator(tableId);
         assert streamReceiver instanceof CassandraStreamReceiver;
 
-        LifecycleNewTracker lifecycleNewTracker = 
CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).createLifecycleNewTracker();
+        LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.STREAM);
 
         Descriptor desc = cfs.newSSTableDescriptor(dataDir, header.version);
 
@@ -190,12 +194,32 @@ public class CassandraEntireSSTableStreamReader 
implements IStreamReader
                                             
DatabaseDescriptor.getFlushCompression());
 
         logger.debug("[Table #{}] {} Components to write: {}", cfs.metadata(), 
desc, components);
-        return desc.getFormat()
-                   .getWriterFactory()
-                   .builder(desc)
-                   .setComponents(components)
-                   .setTableMetadataRef(cfs.metadata)
-                   .setIOOptions(ioOptions)
-                   .createZeroCopyWriter(lifecycleNewTracker, cfs);
+        return new SSTableTxnZeroCopyWriter(txn, desc.getFormat()
+                                                     .getWriterFactory()
+                                                     .builder(desc)
+                                                     .setComponents(components)
+                                                     
.setTableMetadataRef(cfs.metadata)
+                                                     .setIOOptions(ioOptions)
+                                                     
.createZeroCopyWriter(txn, cfs));
+    }
+
+    public static class SSTableTxnZeroCopyWriter extends 
SSTableTxnSingleStreamWriter
+    {
+        private final SSTableZeroCopyWriter writer;
+        public SSTableTxnZeroCopyWriter(ILifecycleTransaction txn, 
SSTableZeroCopyWriter writer)
+        {
+            super(txn, writer);
+            this.writer = writer;
+        }
+
+        public void writeComponent(Component component, DataInputPlus in, long 
length) throws ClosedChannelException
+        {
+            writer.writeComponent(component, in, length);
+        }
+
+        public Descriptor descriptor()
+        {
+            return writer.descriptor;
+        }
     }
 }
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
index e8a6fbcc7c..3498f28b61 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.IncomingStream;
@@ -43,7 +44,7 @@ public class CassandraIncomingFile implements IncomingStream
     private final StreamSession session;
     private final StreamMessageHeader header;
 
-    private volatile SSTableMultiWriter sstable;
+    private volatile SSTableTxnSingleStreamWriter sstable;
     private volatile long size = -1;
     private volatile int numFiles = 1;
 
@@ -84,7 +85,14 @@ public class CassandraIncomingFile implements IncomingStream
             reader = new CassandraStreamReader(header, streamHeader, session);
 
         size = streamHeader.size();
-        sstable = reader.read(in);
+        sstable = (SSTableTxnSingleStreamWriter)reader.read(in);
+    }
+
+    public synchronized Throwable abort(Throwable t)
+    {
+        if (sstable != null)
+            t = sstable.abort(t);
+        return t;
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index af4b4dbc18..bab8d5a84b 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -35,7 +35,9 @@ import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.DeserializationHelper;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.Row;
@@ -47,6 +49,7 @@ import org.apache.cassandra.exceptions.UnknownColumnException;
 import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.Version;
@@ -172,7 +175,7 @@ public class CassandraStreamReader implements IStreamReader
     {
         return header != null? header.toHeader(metadata) : null; //pre-3.0 
sstable have no SerializationHeader
     }
-    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long 
totalSize, long repairedAt, TimeUUID pendingRepair, SSTableFormat<?, ?> format) 
throws IOException
+    protected SSTableTxnSingleStreamWriter createWriter(ColumnFamilyStore cfs, 
long totalSize, long repairedAt, TimeUUID pendingRepair, SSTableFormat<?, ?> 
format) throws IOException
     {
         Directories.DataDirectory localDir = 
cfs.getDirectories().getWriteableLocation(totalSize);
         if (localDir == null)
@@ -180,10 +183,14 @@ public class CassandraStreamReader implements 
IStreamReader
 
         StreamReceiver streamReceiver = session.getAggregator(tableId);
         Preconditions.checkState(streamReceiver instanceof 
CassandraStreamReceiver);
-        LifecycleNewTracker lifecycleNewTracker = 
CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).createLifecycleNewTracker();
+        ILifecycleTransaction txn = createTxn();
+        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, 
estimatedKeys, repairedAt, pendingRepair, false, format, sstableLevel, 
totalSize, txn, getHeader(cfs.metadata()));
+        return new SSTableTxnSingleStreamWriter(txn, writer);
+    }
 
-        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, 
estimatedKeys, repairedAt, pendingRepair, false, format, sstableLevel, 
totalSize, lifecycleNewTracker, getHeader(cfs.metadata()));
-        return writer;
+    private ILifecycleTransaction createTxn()
+    {
+        return LifecycleTransaction.offline(OperationType.STREAM);
     }
 
     protected long totalSize()
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
index 480fcbe951..fc3e9044cf 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
@@ -35,10 +35,8 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.StreamingLifecycleTransaction;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -47,8 +45,7 @@ import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.AccordTopology;
@@ -80,7 +77,7 @@ public class CassandraStreamReceiver implements StreamReceiver
     private final StreamSession session;
 
     // Transaction tracking new files received
-    private final LifecycleTransaction txn;
+    private final StreamingLifecycleTransaction txn;
 
     //  holds references to SSTables received
     protected final Collection<SSTableReader> sstables;
@@ -98,7 +95,7 @@ public class CassandraStreamReceiver implements StreamReceiver
         this.session = session;
         // this is an "offline" transaction, as we currently manually expose 
the sstables once done;
         // this should be revisited at a later date, so that 
LifecycleTransaction manages all sstable state changes
-        this.txn = LifecycleTransaction.offline(OperationType.STREAM);
+        this.txn = new StreamingLifecycleTransaction();
         this.ranges = ranges;
         this.sstables = new ArrayList<>(totalFiles);
         this.requiresWritePath = requiresWritePath(cfs);
@@ -122,16 +119,16 @@ public class CassandraStreamReceiver implements 
StreamReceiver
         CassandraIncomingFile file = getFile(stream);
 
         Collection<SSTableReader> finished = null;
-        SSTableMultiWriter sstable = file.getSSTable();
+        SSTableTxnSingleStreamWriter sstable = 
(SSTableTxnSingleStreamWriter)file.getSSTable();
         try
         {
-            finished = sstable.finish(true);
+            finished = sstable.transferOwnershipTo(txn);
         }
         catch (Throwable t)
         {
             Throwables.maybeFail(sstable.abort(t));
         }
-        txn.update(finished, false);
+        txn.update(finished);
         sstables.addAll(finished);
         receivedEntireSSTable = file.isEntireSSTable();
     }
@@ -143,39 +140,6 @@ public class CassandraStreamReceiver implements 
StreamReceiver
         Throwables.maybeFail(file.getSSTable().abort(null));
     }
 
-    /**
-     * @return a LifecycleNewTracker whose operations are synchronised on this 
StreamReceiveTask.
-     */
-    public synchronized LifecycleNewTracker createLifecycleNewTracker()
-    {
-        return new LifecycleNewTracker()
-        {
-            @Override
-            public void trackNew(SSTable table)
-            {
-                synchronized (CassandraStreamReceiver.this)
-                {
-                    txn.trackNew(table);
-                }
-            }
-
-            @Override
-            public void untrackNew(SSTable table)
-            {
-                synchronized (CassandraStreamReceiver.this)
-                {
-                    txn.untrackNew(table);
-                }
-            }
-
-            public OperationType opType()
-            {
-                return txn.opType();
-            }
-        };
-    }
-
-
     @Override
     public synchronized void abort()
     {
diff --git a/src/java/org/apache/cassandra/index/Index.java 
b/src/java/org/apache/cassandra/index/Index.java
index f9e2265631..8ebbc4226d 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -47,7 +47,7 @@ import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.WriteContext;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.memtable.Memtable;
 import org.apache.cassandra.db.partitions.PartitionIterator;
@@ -384,11 +384,11 @@ public interface Index
      * Get flush observer to observe partition/cell events generated by 
flushing SSTable (memtable flush or compaction).
      *
      * @param descriptor The descriptor of the sstable observer is requested 
for.
-     * @param tracker The {@link LifecycleNewTracker} associated with the 
SSTable being written
+     * @param txn The {@link ILifecycleTransaction} associated with the 
SSTable being written
      *
      * @return SSTable flush observer.
      */
-    default SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
LifecycleNewTracker tracker)
+    default SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
ILifecycleTransaction txn)
     {
         return null;
     }
@@ -822,12 +822,12 @@ public interface Index
          * Get flush observer to observe partition/cell events generated by 
flushing SSTable (memtable flush or compaction).
          *
          * @param descriptor The descriptor of the sstable observer is 
requested for.
-         * @param tracker The {@link LifecycleNewTracker} associated with the 
SSTable being written
+         * @param txn The {@link ILifecycleTransaction} associated with the 
SSTable being written
          * @param tableMetadata The immutable metadata of the table at the 
moment the SSTable is flushed
          *
          * @return SSTable flush observer.
          */
-        SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
LifecycleNewTracker tracker, TableMetadata tableMetadata);
+        SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
ILifecycleTransaction txn, TableMetadata tableMetadata);
 
         /**
          * @param type index transaction type
diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java 
b/src/java/org/apache/cassandra/index/IndexRegistry.java
index 5fa91ca092..8b14ef2c9e 100644
--- a/src/java/org/apache/cassandra/index/IndexRegistry.java
+++ b/src/java/org/apache/cassandra/index/IndexRegistry.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.memtable.Memtable;
@@ -250,7 +250,7 @@ public interface IndexRegistry extends Iterable<Index>
 
             @Nullable
             @Override
-            public SSTableFlushObserver getFlushObserver(Descriptor 
descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata)
+            public SSTableFlushObserver getFlushObserver(Descriptor 
descriptor, ILifecycleTransaction txn, TableMetadata tableMetadata)
             {
                 return null;
             }
diff --git a/src/java/org/apache/cassandra/index/SingletonIndexGroup.java 
b/src/java/org/apache/cassandra/index/SingletonIndexGroup.java
index 162247fd74..501144f069 100644
--- a/src/java/org/apache/cassandra/index/SingletonIndexGroup.java
+++ b/src/java/org/apache/cassandra/index/SingletonIndexGroup.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.WriteContext;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.memtable.Memtable;
 import org.apache.cassandra.index.transactions.IndexTransaction;
 import org.apache.cassandra.io.sstable.Component;
@@ -88,9 +88,9 @@ public class SingletonIndexGroup implements Index.Group
     }
 
     @Override
-    public SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
LifecycleNewTracker tracker, TableMetadata tableMetadata)
+    public SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
ILifecycleTransaction txn, TableMetadata tableMetadata)
     {
-        return delegate.getFlushObserver(descriptor, tracker);
+        return delegate.getFlushObserver(descriptor, txn);
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/index/accord/MemtableIndexManager.java 
b/src/java/org/apache/cassandra/index/accord/MemtableIndexManager.java
index bdaebc2a94..54bcb238a6 100644
--- a/src/java/org/apache/cassandra/index/accord/MemtableIndexManager.java
+++ b/src/java/org/apache/cassandra/index/accord/MemtableIndexManager.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.NavigableSet;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.memtable.Memtable;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.schema.TableId;
@@ -31,7 +31,7 @@ public interface MemtableIndexManager
 {
     long index(DecoratedKey key, Row row, Memtable mt);
 
-    MemtableIndex getPendingMemtableIndex(LifecycleNewTracker tracker);
+    MemtableIndex getPendingMemtableIndex(ILifecycleTransaction txn);
 
     void discardMemtable(Memtable memtable);
 
diff --git a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java 
b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
index aa97dd981e..acd014a5c4 100644
--- a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
+++ b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
@@ -49,7 +49,7 @@ import org.apache.cassandra.db.WriteContext;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.Int32Type;
@@ -317,17 +317,17 @@ public class RouteJournalIndex implements Index, 
INotificationConsumer
 
     @Override
     public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
-                                                 LifecycleNewTracker tracker)
+                                                 ILifecycleTransaction txn)
     {
         // mimics 
org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat.newPerColumnIndexWriter
         IndexDescriptor id = IndexDescriptor.create(descriptor, 
baseCfs.getPartitioner(), baseCfs.metadata().comparator);
-        if (tracker.opType() != OperationType.FLUSH || !initBuildStarted)
+        if (txn.opType() != OperationType.FLUSH || !initBuildStarted)
         {
             return new RouteIndexFormat.SSTableIndexWriter(this, id);
         }
         else
         {
-            return new RouteIndexFormat.MemtableRouteIndexWriter(id, 
memtableIndexManager.getPendingMemtableIndex(tracker));
+            return new RouteIndexFormat.MemtableRouteIndexWriter(id, 
memtableIndexManager.getPendingMemtableIndex(txn));
         }
     }
 
diff --git 
a/src/java/org/apache/cassandra/index/accord/RouteMemtableIndexManager.java 
b/src/java/org/apache/cassandra/index/accord/RouteMemtableIndexManager.java
index f979ea5cfe..13159d1acb 100644
--- a/src/java/org/apache/cassandra/index/accord/RouteMemtableIndexManager.java
+++ b/src/java/org/apache/cassandra/index/accord/RouteMemtableIndexManager.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.memtable.Memtable;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.schema.TableId;
@@ -73,10 +73,10 @@ public class RouteMemtableIndexManager implements 
MemtableIndexManager
     }
 
     @Override
-    public MemtableIndex getPendingMemtableIndex(LifecycleNewTracker tracker)
+    public MemtableIndex getPendingMemtableIndex(ILifecycleTransaction txn)
     {
         return liveMemtableIndexMap.keySet().stream()
-                                   .filter(m -> 
tracker.equals(m.getFlushTransaction()))
+                                   .filter(m -> 
txn.equals(m.getFlushTransaction()))
                                    .findFirst()
                                    .map(liveMemtableIndexMap::get)
                                    .orElse(null);
diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java 
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
index c17f9fb0a9..c93923562c 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
@@ -67,7 +67,7 @@ import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.guardrails.GuardrailViolatedException;
 import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.db.guardrails.MaxThreshold;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.FloatType;
 import org.apache.cassandra.db.memtable.Memtable;
@@ -541,7 +541,7 @@ public class StorageAttachedIndex implements Index
     }
 
     @Override
-    public SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
LifecycleNewTracker tracker)
+    public SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
ILifecycleTransaction txn)
     {
         // flush observers should be created from the index group, this is 
only used by the singleton index group
         throw new UnsupportedOperationException("Storage-attached index flush 
observers should never be created directly.");
diff --git 
a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java 
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
index 02e7971814..28711e9abe 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.WriteContext;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.db.memtable.Memtable;
 import org.apache.cassandra.db.rows.Row;
@@ -200,12 +200,12 @@ public class StorageAttachedIndexGroup implements 
Index.Group, INotificationCons
     }
 
     @Override
-    public SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
LifecycleNewTracker tracker, TableMetadata tableMetadata)
+    public SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
ILifecycleTransaction txn, TableMetadata tableMetadata)
     {
         IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor, 
tableMetadata.partitioner, tableMetadata.comparator);
         try
         {
-            return 
StorageAttachedIndexWriter.createFlushObserverWriter(indexDescriptor, indexes, 
tracker);
+            return 
StorageAttachedIndexWriter.createFlushObserverWriter(indexDescriptor, indexes, 
txn);
         }
         catch (Throwable t)
         {
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
index 341bcaac5e..ea1a7d69c7 100644
--- 
a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
+++ 
b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
@@ -29,7 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.tries.InMemoryTrie;
@@ -63,29 +63,29 @@ public class StorageAttachedIndexWriter implements 
SSTableFlushObserver
 
     public static StorageAttachedIndexWriter 
createFlushObserverWriter(IndexDescriptor indexDescriptor,
                                                                        
Collection<StorageAttachedIndex> indexes,
-                                                                       
LifecycleNewTracker lifecycleNewTracker) throws IOException
+                                                                       
ILifecycleTransaction txn) throws IOException
     {
-        return new StorageAttachedIndexWriter(indexDescriptor, indexes, 
lifecycleNewTracker, false);
+        return new StorageAttachedIndexWriter(indexDescriptor, indexes, txn, 
false);
 
     }
 
     public static StorageAttachedIndexWriter 
createBuilderWriter(IndexDescriptor indexDescriptor,
                                                                  
Collection<StorageAttachedIndex> indexes,
-                                                                 
LifecycleNewTracker lifecycleNewTracker,
+                                                                 
ILifecycleTransaction txn,
                                                                  boolean 
perIndexComponentsOnly) throws IOException
     {
-        return new StorageAttachedIndexWriter(indexDescriptor, indexes, 
lifecycleNewTracker, perIndexComponentsOnly);
+        return new StorageAttachedIndexWriter(indexDescriptor, indexes, txn, 
perIndexComponentsOnly);
     }
 
     private StorageAttachedIndexWriter(IndexDescriptor indexDescriptor,
                                        Collection<StorageAttachedIndex> 
indexes,
-                                       LifecycleNewTracker lifecycleNewTracker,
+                                       ILifecycleTransaction txn,
                                        boolean perIndexComponentsOnly) throws 
IOException
     {
         this.indexDescriptor = indexDescriptor;
-        this.rowMapping = RowMapping.create(lifecycleNewTracker.opType());
+        this.rowMapping = RowMapping.create(txn.opType());
         this.perIndexWriters = indexes.stream().map(index -> 
indexDescriptor.newPerColumnIndexWriter(index,
-                                                                               
                      lifecycleNewTracker,
+                                                                               
                      txn,
                                                                                
                      rowMapping))
                                       .filter(Objects::nonNull) // a null here 
means the column had no data to flush
                                       .collect(Collectors.toList());
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java 
b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
index 4501aec7f9..976097b084 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ClusteringComparator;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.index.sai.StorageAttachedIndex;
 import org.apache.cassandra.index.sai.IndexValidation;
@@ -135,10 +135,10 @@ public class IndexDescriptor
     }
 
     public PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex 
index,
-                                                        LifecycleNewTracker 
tracker,
+                                                        ILifecycleTransaction 
txn,
                                                         RowMapping rowMapping)
     {
-        return version.onDiskFormat().newPerColumnIndexWriter(index, this, 
tracker, rowMapping);
+        return version.onDiskFormat().newPerColumnIndexWriter(index, this, 
txn, rowMapping);
     }
 
     public boolean isPerSSTableIndexBuildComplete()
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java 
b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java
index 30ba3b6295..a33a5f7837 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Set;
 
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.index.sai.SSTableContext;
 import org.apache.cassandra.index.sai.StorageAttachedIndex;
 import org.apache.cassandra.index.sai.disk.PerColumnIndexWriter;
@@ -80,19 +80,19 @@ public interface OnDiskFormat
     PerSSTableIndexWriter newPerSSTableIndexWriter(IndexDescriptor 
indexDescriptor) throws IOException;
 
     /**
-     * Create a new {@link PerColumnIndexWriter} to write the per-column 
on-disk components of an index. The {@link LifecycleNewTracker}
+     * Create a new {@link PerColumnIndexWriter} to write the per-column 
on-disk components of an index. The {@link ILifecycleTransaction}
      * is used to determine the type of index write about to happen this will 
either be an
      * {@code OperationType.FLUSH} indicating that we are about to flush a 
{@link org.apache.cassandra.index.sai.memory.MemtableIndex}
      * or one of the other operation types indicating that we will be writing 
from an existing SSTable
      *
      * @param index The {@link StorageAttachedIndex} holding the current index 
build status
      * @param indexDescriptor The {@link IndexDescriptor} for the SSTable
-     * @param tracker The {@link LifecycleNewTracker} for index build 
operation.
+     * @param txn The {@link ILifecycleTransaction} for index build operation.
      * @param rowMapping The {@link RowMapping} that is used to map rowID to 
{@code PrimaryKey} during the write operation
      */
     PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex index,
                                                  IndexDescriptor 
indexDescriptor,
-                                                 LifecycleNewTracker tracker,
+                                                 ILifecycleTransaction txn,
                                                  RowMapping rowMapping);
 
     /**
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
index 8d8266ac34..d8f5d9e631 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
 import com.codahale.metrics.Gauge;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.index.sai.SSTableContext;
 import org.apache.cassandra.index.sai.StorageAttachedIndex;
 import org.apache.cassandra.index.sai.disk.PerColumnIndexWriter;
@@ -151,11 +151,11 @@ public class V1OnDiskFormat implements OnDiskFormat
     @Override
     public PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex 
index,
                                                         IndexDescriptor 
indexDescriptor,
-                                                        LifecycleNewTracker 
tracker,
+                                                        ILifecycleTransaction 
txn,
                                                         RowMapping rowMapping)
     {
         // If we're not flushing, or we haven't yet started the initialization 
build, flush from SSTable contents.
-        if (tracker.opType() != OperationType.FLUSH || 
!index.isInitBuildStarted())
+        if (txn.opType() != OperationType.FLUSH || !index.isInitBuildStarted())
         {
             NamedMemoryLimiter limiter = SEGMENT_BUILD_MEMORY_LIMITER;
             logger.info(index.identifier().logMessage("Starting a compaction 
index build. Global segment memory usage: {}"),
@@ -164,7 +164,7 @@ public class V1OnDiskFormat implements OnDiskFormat
             return new SSTableIndexWriter(indexDescriptor, index, limiter, 
index.isIndexValid());
         }
 
-        return new 
MemtableIndexWriter(index.memtableIndexManager().getPendingMemtableIndex(tracker),
+        return new 
MemtableIndexWriter(index.memtableIndexManager().getPendingMemtableIndex(txn),
                                        indexDescriptor,
                                        index.termType(),
                                        index.identifier(),
diff --git 
a/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java 
b/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java
index abd62dbff1..99bff5a1e1 100644
--- a/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java
+++ b/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java
@@ -32,7 +32,7 @@ import javax.annotation.Nullable;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.memtable.Memtable;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.index.sai.StorageAttachedIndex;
@@ -119,10 +119,10 @@ public class MemtableIndexManager
     }
 
     @Nullable
-    public MemtableIndex getPendingMemtableIndex(LifecycleNewTracker tracker)
+    public MemtableIndex getPendingMemtableIndex(ILifecycleTransaction txn)
     {
         return liveMemtableIndexMap.keySet().stream()
-                                   .filter(m -> 
tracker.equals(m.getFlushTransaction()))
+                                   .filter(m -> 
txn.equals(m.getFlushTransaction()))
                                    .findFirst()
                                    .map(liveMemtableIndexMap::get)
                                    .orElse(null);
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java 
b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index ccfed7f5c9..b2989cb8d8 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.db.WriteContext;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.memtable.Memtable;
@@ -323,9 +323,9 @@ public class SASIIndex implements Index, 
INotificationConsumer
         return new SASIIndexSearcher(cfs, command, 
DatabaseDescriptor.getRangeRpcTimeout(MILLISECONDS));
     }
 
-    public SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
LifecycleNewTracker tracker)
+    public SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
ILifecycleTransaction txn)
     {
-        return newWriter(baseCfs.metadata().partitionKeyType, descriptor, 
Collections.singletonMap(index.getDefinition(), index), tracker.opType());
+        return newWriter(baseCfs.metadata().partitionKeyType, descriptor, 
Collections.singletonMap(index.getDefinition(), index), txn.opType());
     }
 
     public IndexBuildingSupport getBuildTaskSupport()
diff --git 
a/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java
index 422c6eaa6e..1b4fa8e8fe 100644
--- a/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.DiskBoundaries;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -47,14 +47,13 @@ public class RangeAwareSSTableWriter implements 
SSTableMultiWriter
     private final boolean isTransient;
     private final SSTableFormat<?, ?> format;
     private final SerializationHeader header;
-    private final LifecycleNewTracker lifecycleNewTracker;
+    private final ILifecycleTransaction txn;
     private int currentIndex = -1;
     public final ColumnFamilyStore cfs;
     private final List<SSTableMultiWriter> finishedWriters = new ArrayList<>();
-    private final List<SSTableReader> finishedReaders = new ArrayList<>();
     private SSTableMultiWriter currentWriter = null;
 
-    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, 
long repairedAt, TimeUUID pendingRepair, boolean isTransient, SSTableFormat<?, 
?> format, int sstableLevel, long totalSize, LifecycleNewTracker 
lifecycleNewTracker, SerializationHeader header) throws IOException
+    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, 
long repairedAt, TimeUUID pendingRepair, boolean isTransient, SSTableFormat<?, 
?> format, int sstableLevel, long totalSize, ILifecycleTransaction txn, 
SerializationHeader header) throws IOException
     {
         DiskBoundaries db = cfs.getDiskBoundaries();
         directories = db.directories;
@@ -65,7 +64,7 @@ public class RangeAwareSSTableWriter implements 
SSTableMultiWriter
         this.pendingRepair = pendingRepair;
         this.isTransient = isTransient;
         this.format = format;
-        this.lifecycleNewTracker = lifecycleNewTracker;
+        this.txn = txn;
         this.header = header;
         boundaries = db.positions;
         if (boundaries == null)
@@ -75,7 +74,7 @@ public class RangeAwareSSTableWriter implements 
SSTableMultiWriter
                 throw new IOException(String.format("Insufficient disk space 
to store %s",
                                                     
FBUtilities.prettyPrintMemory(totalSize)));
             Descriptor desc = 
cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(localDir), 
format);
-            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, pendingRepair, isTransient, null, sstableLevel, header, 
lifecycleNewTracker);
+            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, pendingRepair, isTransient, null, sstableLevel, header, txn);
         }
     }
 
@@ -97,7 +96,7 @@ public class RangeAwareSSTableWriter implements 
SSTableMultiWriter
                 finishedWriters.add(currentWriter);
 
             Descriptor desc = 
cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex)),
 format);
-            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, pendingRepair, isTransient, null, sstableLevel, header, 
lifecycleNewTracker);
+            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, pendingRepair, isTransient, null, sstableLevel, header, txn);
         }
     }
 
@@ -113,6 +112,7 @@ public class RangeAwareSSTableWriter implements 
SSTableMultiWriter
         if (currentWriter != null)
             finishedWriters.add(currentWriter);
         currentWriter = null;
+        List<SSTableReader> finishedReaders = new 
ArrayList<>(finishedWriters.size());
         for (SSTableMultiWriter writer : finishedWriters)
         {
             if (writer.getBytesWritten() > 0)
@@ -126,6 +126,12 @@ public class RangeAwareSSTableWriter implements 
SSTableMultiWriter
     @Override
     public Collection<SSTableReader> finished()
     {
+        List<SSTableReader> finishedReaders = new 
ArrayList<>(finishedWriters.size());
+        for (SSTableMultiWriter writer : finishedWriters)
+        {
+            if (writer != null)
+                finishedReaders.addAll(writer.finished());
+        }
         return finishedReaders;
     }
 
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java
new file mode 100644
index 0000000000..390e63858f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.StreamingLifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.Throwables;
+
+public class SSTableTxnSingleStreamWriter implements SSTableMultiWriter
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(SSTableTxnSingleStreamWriter.class);
+
+    private final ILifecycleTransaction txn;
+    private final SSTableMultiWriter writer;
+    private boolean complete = false;
+
+    public SSTableTxnSingleStreamWriter(ILifecycleTransaction txn, 
SSTableMultiWriter writer)
+    {
+        this.txn = txn;
+        this.writer = writer;
+    }
+
+    @Override
+    public void append(UnfilteredRowIterator partition)
+    {
+        writer.append(partition);
+    }
+
+    public synchronized Collection<SSTableReader> 
transferOwnershipTo(StreamingLifecycleTransaction globalTxn)
+    {
+        failIfComplete();
+        writer.setOpenResult(true);
+        writer.prepareToCommit();
+        txn.prepareToCommit();
+        globalTxn.takeOwnership(txn);
+        Throwables.maybeFail(writer.commit(txn.commit(null)));
+        complete = true;
+        return writer.finished();
+    }
+
+    @Override
+    public synchronized Throwable abort(Throwable accumulate)
+    {
+        if (complete)
+        {
+            logger.debug("Already completed writer for '{}'. Nothing to 
abort.", getFilename());
+            return accumulate;
+        }
+
+        complete = true;
+        return txn.abort(writer.abort(accumulate));
+    }
+
+    @Override
+    public synchronized void close()
+    {
+        complete = true;
+        writer.close();
+    }
+
+    private void failIfComplete()
+    {
+        if (complete)
+            throw new IllegalStateException("Writer "+getFilename()+" has 
already completed");
+    }
+
+    @Override
+    public String getFilename()
+    {
+        return writer.getFilename();
+    }
+
+    @Override
+    public long getBytesWritten()
+    {
+        return writer.getBytesWritten();
+    }
+
+    @Override
+    public long getOnDiskBytesWritten()
+    {
+        return writer.getOnDiskBytesWritten();
+    }
+
+    @Override
+    public TableId getTableId()
+    {
+        return writer.getTableId();
+    }
+
+    @Override
+    public Collection<SSTableReader> finish(boolean openResult)
+    {
+        throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter 
should be finished via transferOwnershipTo");
+    }
+
+    @Override
+    public Collection<SSTableReader> finished()
+    {
+        throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter 
should be finished via transferOwnershipTo");
+    }
+
+    @Override
+    public SSTableMultiWriter setOpenResult(boolean openResult)
+    {
+        throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter 
should be finished via transferOwnershipTo");
+    }
+
+    @Override
+    public void prepareToCommit()
+    {
+        throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter 
should be finished via transferOwnershipTo");
+    }
+
+    @Override
+    public Throwable commit(Throwable accumulate)
+    {
+        throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter 
should be finished via transferOwnershipTo");
+    }
+
+}
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
index 46a490974e..47566ee550 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
@@ -31,7 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
@@ -56,12 +56,11 @@ public class SSTableZeroCopyWriter extends SSTable 
implements SSTableMultiWriter
     private final Map<String, ZeroCopySequentialWriter> componentWriters; // 
indexed by component name
 
     public SSTableZeroCopyWriter(Builder<?, ?> builder,
-                                 LifecycleNewTracker lifecycleNewTracker,
+                                 ILifecycleTransaction txn,
                                  SSTable.Owner owner)
     {
         super(builder, owner);
-
-        lifecycleNewTracker.trackNew(this);
+        txn.trackNew(this);
         this.componentWriters = new HashMap<>();
 
         Set<Component> unsupported = components.stream()
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index 99406dba31..6c7b7b6d7f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -23,7 +23,7 @@ import java.util.Collections;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -36,11 +36,11 @@ import org.apache.cassandra.utils.TimeUUID;
 public class SimpleSSTableMultiWriter implements SSTableMultiWriter
 {
     private final SSTableWriter writer;
-    private final LifecycleNewTracker lifecycleNewTracker;
+    private final ILifecycleTransaction txn;
 
-    protected SimpleSSTableMultiWriter(SSTableWriter writer, 
LifecycleNewTracker lifecycleNewTracker)
+    protected SimpleSSTableMultiWriter(SSTableWriter writer, 
ILifecycleTransaction txn)
     {
-        this.lifecycleNewTracker = lifecycleNewTracker;
+        this.txn = txn;
         this.writer = writer;
     }
 
@@ -92,7 +92,7 @@ public class SimpleSSTableMultiWriter implements 
SSTableMultiWriter
 
     public Throwable abort(Throwable accumulate)
     {
-        lifecycleNewTracker.untrackNew(writer);
+        txn.untrackNew(writer);
         return writer.abort(accumulate);
     }
 
@@ -116,7 +116,7 @@ public class SimpleSSTableMultiWriter implements 
SSTableMultiWriter
                                             int sstableLevel,
                                             SerializationHeader header,
                                             Collection<Index.Group> 
indexGroups,
-                                            LifecycleNewTracker 
lifecycleNewTracker,
+                                            ILifecycleTransaction txn,
                                             SSTable.Owner owner)
     {
         MetadataCollector metadataCollector = new 
MetadataCollector(metadata.get().comparator)
@@ -132,7 +132,7 @@ public class SimpleSSTableMultiWriter implements 
SSTableMultiWriter
                                             .setSerializationHeader(header)
                                             .addDefaultComponents(indexGroups)
                                             
.setSecondaryIndexGroups(indexGroups)
-                                            .build(lifecycleNewTracker, owner);
-        return new SimpleSSTableMultiWriter(writer, lifecycleNewTracker);
+                                            .build(txn, owner);
+        return new SimpleSSTableMultiWriter(writer, txn);
     }
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
index 654880c2c1..6ffaa8a7ba 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
@@ -24,7 +24,6 @@ import javax.annotation.Nonnull;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
@@ -137,7 +136,7 @@ public interface SSTableFormat<R extends SSTableReader, W 
extends SSTableWriter>
         /**
          * Returns a new builder which can create instance of {@link 
SSTableWriter} with the provided parameters.
          * Similarly to the loading builder, it should open the required 
resources when
-         * the {@link SSTableWriter.Builder#build(LifecycleNewTracker, 
SSTable.Owner)} method is called.
+         * the {@link 
SSTableWriter.Builder#build(org.apache.cassandra.db.lifecycle.ILifecycleTransaction,
 SSTable.Owner)} method is called.
          * It should not let the caller passing any closeable resources 
directly, that is, via setters.
          * If building fails, all the opened resources should be released.
          */
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 38efd1955e..a3f24c0e5b 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
@@ -63,7 +63,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * A root class for a writer implementation. A writer must be created by 
passing an implementation-specific
- * {@link Builder}, a {@link LifecycleNewTracker} and {@link SSTable.Owner} 
instances. Implementing classes should
+ * {@link Builder}, a {@link ILifecycleTransaction} and {@link SSTable.Owner} 
instances. Implementing classes should
  * not extend that list and all the additional properties should be included 
in the builder.
  */
 public abstract class SSTableWriter extends SSTable implements Transactional
@@ -80,7 +80,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
     protected final List<SSTableFlushObserver> observers;
     protected final MmappedRegionsCache mmappedRegionsCache;
     protected final TransactionalProxy txnProxy = txnProxy();
-    protected final LifecycleNewTracker lifecycleNewTracker;
+    protected final ILifecycleTransaction txn;
     protected DecoratedKey first;
     protected DecoratedKey last;
 
@@ -90,7 +90,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
      */
     protected abstract TransactionalProxy txnProxy();
 
-    protected SSTableWriter(Builder<?, ?> builder, LifecycleNewTracker 
lifecycleNewTracker, SSTable.Owner owner)
+    protected SSTableWriter(Builder<?, ?> builder, ILifecycleTransaction txn, 
SSTable.Owner owner)
     {
         super(builder, owner);
         checkNotNull(builder.getIndexGroups());
@@ -104,7 +104,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
         this.metadataCollector = builder.getMetadataCollector();
         this.header = builder.getSerializationHeader();
         this.mmappedRegionsCache = builder.getMmappedRegionsCache();
-        this.lifecycleNewTracker = lifecycleNewTracker;
+        this.txn = txn;
 
         // We need to ensure that no sstable components exist before the 
lifecycle transaction starts tracking it.
         // Otherwise, it means that we either want to overwrite some existing 
sstable, which is not allowed, or some
@@ -116,7 +116,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
                                                             
descriptor.directory,
                                                             
existingComponents);
 
-        lifecycleNewTracker.trackNew(this);
+        txn.trackNew(this);
 
         try
         {
@@ -124,7 +124,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
             this.observers = Collections.unmodifiableList(observers);
             for (Index.Group group : builder.getIndexGroups())
             {
-                SSTableFlushObserver observer = 
group.getFlushObserver(descriptor, lifecycleNewTracker, metadata.getLocal());
+                SSTableFlushObserver observer = 
group.getFlushObserver(descriptor, txn, metadata.getLocal());
                 if (observer != null)
                 {
                     observer.begin();
@@ -146,7 +146,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
      * The caught exception should be then rethrown so the {@link Builder} can 
handle it and close any resources opened
      * implicitly by the builder.
      * <p>
-     * See {@link 
SortedTableWriter#SortedTableWriter(SortedTableWriter.Builder, 
LifecycleNewTracker, Owner)} as of CASSANDRA-18737.
+     * See {@link 
SortedTableWriter#SortedTableWriter(SortedTableWriter.Builder, 
ILifecycleTransaction, Owner)} as of CASSANDRA-18737.
      *
      * @param ex the exception thrown during the construction
      */
@@ -156,7 +156,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
         for (int i = observers.size()-1; i >= 0; i--)
             observers.get(i).abort(ex);
         descriptor.getFormat().deleteOrphanedComponents(descriptor, 
components);
-        lifecycleNewTracker.untrackNew(this);
+        txn.untrackNew(this);
     }
 
     @Override
@@ -422,7 +422,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
     /**
      * A builder of this sstable writer. It should be extended for each 
implementation with the specific fields.
      *
-     * An implementation should open all the resources when {@link 
#build(LifecycleNewTracker, Owner)} and pass them
+     * An implementation should open all the resources when {@link 
#build(ILifecycleTransaction, Owner)} and pass them
      * in builder fields to the writer, so that the writer can access them via 
getters.
      *
      * @param <W> type of the sstable writer to be build with this builder
@@ -557,20 +557,20 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
             super(descriptor);
         }
 
-        public W build(LifecycleNewTracker lifecycleNewTracker, Owner owner)
+        public W build(ILifecycleTransaction txn, Owner owner)
         {
             checkNotNull(getComponents());
 
             validateRepairedMetadata(getRepairedAt(), getPendingRepair(), 
isTransientSSTable());
 
-            return buildInternal(lifecycleNewTracker, owner);
+            return buildInternal(txn, owner);
         }
 
-        protected abstract W buildInternal(LifecycleNewTracker 
lifecycleNewTracker, Owner owner);
+        protected abstract W buildInternal(ILifecycleTransaction txn, Owner 
owner);
 
-        public SSTableZeroCopyWriter createZeroCopyWriter(LifecycleNewTracker 
lifecycleNewTracker, Owner owner)
+        public SSTableZeroCopyWriter 
createZeroCopyWriter(ILifecycleTransaction txn, Owner owner)
         {
-            return new SSTableZeroCopyWriter(this, lifecycleNewTracker, owner);
+            return new SSTableZeroCopyWriter(this, txn, owner);
         }
     }
 }
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
index 6fba07ba1e..5ccaf26710 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.db.DeletionPurger;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.db.guardrails.Threshold;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.rows.ComplexColumnData;
 import org.apache.cassandra.db.rows.PartitionSerializationException;
 import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
@@ -88,9 +88,9 @@ public abstract class SortedTableWriter<P extends 
SortedTablePartitionWriter, I
     private long lastEarlyOpenLength;
     private final Supplier<Double> crcCheckChanceSupplier;
 
-    public SortedTableWriter(Builder<P, I, ?, ?> builder, LifecycleNewTracker 
lifecycleNewTracker, SSTable.Owner owner)
+    public SortedTableWriter(Builder<P, I, ?, ?> builder, 
ILifecycleTransaction txn, SSTable.Owner owner)
     {
-        super(builder, lifecycleNewTracker, owner);
+        super(builder, txn, owner);
 
         TableMetadataRef ref = builder.getTableMetadataRef();
         crcCheckChanceSupplier = () -> ref.getLocal().params.crcCheckChance;
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 1cd63e56b1..3233ca4c06 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
@@ -74,16 +73,15 @@ public class BigTableWriter extends 
SortedTableWriter<BigFormatPartitionWriter,
     private final Map<DecoratedKey, AbstractRowIndexEntry> cachedKeys = new 
HashMap<>();
     private final boolean shouldMigrateKeyCache;
 
-    public BigTableWriter(Builder builder, LifecycleNewTracker 
lifecycleNewTracker, SSTable.Owner owner)
+    public BigTableWriter(Builder builder, ILifecycleTransaction txn, 
SSTable.Owner owner)
     {
-        super(builder, lifecycleNewTracker, owner);
+        super(builder, txn, owner);
 
         this.rowIndexEntrySerializer = builder.getRowIndexEntrySerializer();
         checkNotNull(this.rowIndexEntrySerializer);
 
         this.shouldMigrateKeyCache = 
DatabaseDescriptor.shouldMigrateKeycacheOnCompaction()
-                                     && lifecycleNewTracker instanceof 
ILifecycleTransaction
-                                     && !((ILifecycleTransaction) 
lifecycleNewTracker).isOffline();
+                                     && !txn.isOffline();
     }
 
     @Override
@@ -114,7 +112,7 @@ public class BigTableWriter extends 
SortedTableWriter<BigFormatPartitionWriter,
 
         if (shouldMigrateKeyCache)
         {
-            for (SSTableReader reader : ((ILifecycleTransaction) 
lifecycleNewTracker).originals())
+            for (SSTableReader reader : txn.originals())
             {
                 if (reader instanceof KeyCacheSupport<?> && 
((KeyCacheSupport<?>) reader).getCachedPosition(key, false) != null)
                 {
@@ -431,14 +429,14 @@ public class BigTableWriter extends 
SortedTableWriter<BigFormatPartitionWriter,
         }
 
         @Override
-        protected BigTableWriter buildInternal(LifecycleNewTracker 
lifecycleNewTracker, Owner owner)
+        protected BigTableWriter buildInternal(ILifecycleTransaction txn, 
Owner owner)
         {
             try
             {
-                this.operationType = lifecycleNewTracker.opType();
+                this.operationType = txn.opType();
                 this.mmappedRegionsCache = new MmappedRegionsCache();
                 this.rowIndexEntrySerializer = new 
RowIndexEntry.Serializer(descriptor.version, getSerializationHeader(), owner != 
null ? owner.getMetrics() : null);
-                return new BigTableWriter(this, lifecycleNewTracker, owner);
+                return new BigTableWriter(this, txn, owner);
             }
             catch (RuntimeException | Error ex)
             {
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java
index 7aad38511f..074c5c1708 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
@@ -65,9 +65,9 @@ public class BtiTableWriter extends 
SortedTableWriter<BtiFormatPartitionWriter,
 {
     private static final Logger logger = 
LoggerFactory.getLogger(BtiTableWriter.class);
 
-    public BtiTableWriter(Builder builder, LifecycleNewTracker 
lifecycleNewTracker, SSTable.Owner owner)
+    public BtiTableWriter(Builder builder, ILifecycleTransaction txn, 
SSTable.Owner owner)
     {
-        super(builder, lifecycleNewTracker, owner);
+        super(builder, txn, owner);
     }
 
     @Override
@@ -371,14 +371,14 @@ public class BtiTableWriter extends 
SortedTableWriter<BtiFormatPartitionWriter,
         }
 
         @Override
-        protected BtiTableWriter buildInternal(LifecycleNewTracker 
lifecycleNewTracker, Owner owner)
+        protected BtiTableWriter buildInternal(ILifecycleTransaction txn, 
Owner owner)
         {
             try
             {
                 this.mmappedRegionsCache = new MmappedRegionsCache();
-                this.operationType = lifecycleNewTracker.opType();
+                this.operationType = txn.opType();
 
-                return new BtiTableWriter(this, lifecycleNewTracker, owner);
+                return new BtiTableWriter(this, txn, owner);
             }
             catch (RuntimeException | Error ex)
             {
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStream.java 
b/src/java/org/apache/cassandra/streaming/IncomingStream.java
index 25ab62630c..22141ef85f 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStream.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStream.java
@@ -36,7 +36,7 @@ public interface IncomingStream
      * Read in the stream data.
      */
     void read(DataInputPlus inputPlus, int version) throws Throwable;
-
+    default Throwable abort(Throwable t) { return t; }
     String getName();
     long getSize();
     int getNumFiles();
diff --git 
a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java 
b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
index 2d05322543..a9ddabb689 100644
--- a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.guardrails.GuardrailViolatedException;
 import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
 import org.apache.cassandra.streaming.messages.KeepAliveMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -54,9 +55,9 @@ public class StreamDeserializingTask implements Runnable
     public void run()
     {
         StreamingDataInputPlus input = channel.in();
+        StreamMessage message = null;
         try
         {
-            StreamMessage message;
             while (null != (message = StreamMessage.deserialize(input, 
messagingVersion)))
             {
                 // keep-alives don't necessarily need to be tied to a session 
(they could be arrive before or after
@@ -93,6 +94,8 @@ public class StreamDeserializingTask implements Runnable
         catch (Throwable t)
         {
             JVMStabilityInspector.inspectThrowable(t);
+            if ((session == null || session.isFailedOrAborted()) && message 
instanceof IncomingStreamMessage)
+                t = ((IncomingStreamMessage) message).stream.abort(t);
             if (session != null)
             {
                 session.onError(t);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/IndexBuildFailsAfterStreamingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/IndexBuildFailsAfterStreamingTest.java
new file mode 100644
index 0000000000..767689f8cc
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/IndexBuildFailsAfterStreamingTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.index.SecondaryIndexManager;
+
+import static net.bytebuddy.implementation.MethodDelegation.to;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class IndexBuildFailsAfterStreamingTest extends TestBaseImpl
+{
+    @Test
+    public void test() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           
.withInstanceInitializer(BBHelper::install)
+                                           .withConfig(c -> 
c.with(Feature.values())
+                                                             
.set("stream_entire_sstables", false)
+                                                             
.set("disk_failure_policy", "die"))
+                                           .start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (p int, c 
int, v int, PRIMARY KEY(p, c))"));
+            cluster.schemaChange(withKeyspace("CREATE INDEX idx ON 
%s.tbl(v)"));
+
+            for (int i = 0; i < 100; i++)
+                cluster.get(1).executeInternal(withKeyspace("insert into 
%s.tbl (p, c, v) values (?, ?, ?)"), i, i, i);
+            cluster.get(1).flush(KEYSPACE);
+            cluster.get(2).runOnInstance(() -> BBHelper.enabled.set(true));
+
+            // pre-existing weird behaivour - nodetool repair fails, but the 
sstables are actually streamed & live on node2:
+            cluster.get(2).runOnInstance(() -> 
assertTrue(Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().isEmpty()));
+            cluster.get(1).nodetoolResult("repair", 
KEYSPACE).asserts().failure();
+            cluster.get(2).runOnInstance(() -> 
assertFalse(Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().isEmpty()));
+            for (int i = 0; i < 100; i++)
+                assertRows(cluster.get(2).executeInternal(withKeyspace("select 
* from %s.tbl where p = ? and c = ?"), i, i), row(i, i, i));
+
+            assertRows(cluster.get(1).executeInternal("select * from 
system.\"IndexInfo\" where table_name=? and index_name=?", KEYSPACE, "idx"), 
row(KEYSPACE, "idx", null));
+            // index not built:
+            assertEquals(0, cluster.get(2).executeInternal("select * from 
system.\"IndexInfo\" where table_name=? and index_name=?", KEYSPACE, 
"idx").length);
+        }
+    }
+
+    public static class BBHelper
+    {
+        public static void install(ClassLoader classLoader, int num)
+        {
+            if (num == 2)
+            {
+                new ByteBuddy().rebase(SecondaryIndexManager.class)
+                               
.method(named("calculateIndexingPageSize").and(takesNoArguments()))
+                               .intercept(to(BBHelper.class))
+                               .make()
+                               .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+        public static AtomicBoolean enabled = new AtomicBoolean();
+        public static int calculateIndexingPageSize(@SuperCall 
Callable<Integer> zuper) throws Exception
+        {
+            if (enabled.get())
+                throw new RuntimeException("On purpose fail 2i build");
+            return zuper.call();
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterReceivingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterReceivingTest.java
new file mode 100644
index 0000000000..acffd1eee1
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterReceivingTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
+
+import static net.bytebuddy.implementation.MethodDelegation.to;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class StreamFailedAfterReceivingTest extends TestBaseImpl
+{
+    @Test
+    public void zcsTest() throws IOException, ExecutionException, 
InterruptedException
+    {
+        leftoverFilesTest(true);
+    }
+
+    @Test
+    public void nozcsTest() throws IOException, ExecutionException, 
InterruptedException
+    {
+        leftoverFilesTest(false);
+    }
+
+    public void leftoverFilesTest(boolean zcs) throws IOException, 
ExecutionException, InterruptedException
+    {
+        try (Cluster cluster = Cluster.build(2)
+                                      
.withInstanceInitializer(BBHelper::install)
+                                      .withConfig(c -> c.with(Feature.values())
+                                                        
.set("stream_entire_sstables", zcs)
+                                                        
.set("autocompaction_on_startup_enabled", false)
+                                                        
.set("disk_failure_policy", "die"))
+                                      .start())
+        {
+            init(cluster);
+
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int 
PRIMARY KEY, x int)"));
+            cluster.forEach(i -> 
i.nodetoolResult("disableautocompaction").asserts().success());
+            IInvokableInstance node1 = cluster.get(1);
+            IInvokableInstance node2 = cluster.get(2);
+            for (int i = 1; i <= 1000000; i++)
+            {
+                node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, x) 
VALUES (?,?)"), i, i);
+                if (i % 100000 == 0)
+                    node1.flush(KEYSPACE);
+            }
+            node1.flush(KEYSPACE);
+            node2.runOnInstance(() -> BBHelper.enabled.set(true));
+            cluster.setUncaughtExceptionsFilter((e) -> 
e.getClass().getName().contains("TransactionAlreadyCompletedException"));
+            node1.nodetoolResult("repair", "-pr", "-full", KEYSPACE, 
"tbl").asserts().failure();
+            node2.runOnInstance(() -> BBHelper.cdl.awaitUninterruptibly());
+            node2.runOnInstance(() -> BBHelper.enabled.set(false));
+            node2.shutdown().get();
+            node2.startup();
+        }
+    }
+
+
+    public static class BBHelper
+    {
+        public static void install(ClassLoader classLoader, Integer num)
+        {
+            if (num == 2)
+            {
+                // in this case we need to throw after trackNew:ing the 
sstable, but before it is finished
+                new ByteBuddy().rebase(LifecycleTransaction.class)
+                               
.method(named("trackNew").and(takesArguments(1)))
+                               
.intercept(to(StreamFailedAfterReceivingTest.BBHelper.class))
+                               .make()
+                               .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        static AtomicInteger waiting = new AtomicInteger();
+        static AtomicBoolean enabled = new AtomicBoolean();
+        static CountDownLatch cdl = CountDownLatch.newCountDownLatch(1);
+
+        public static void trackNew(SSTable sstable, @SuperCall Callable<Void> 
zuper) throws Exception
+        {
+            zuper.call();
+            if (enabled.get())
+            {
+                if (waiting.incrementAndGet() > 4)
+                    throw new RuntimeException();
+
+                // using a sleep instead of a horrible nesting of latches - 
this should
+                // not make the test flaky, just might flakily pass without 
hitting the
+                // right condition
+                Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+                cdl.decrement();
+            }
+        }
+    }
+
+    @Test
+    public void basicStreamTest() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                      .withConfig(c -> c.with(Feature.values())
+                                                        
.set("stream_entire_sstables", false)
+                                                        
.set("autocompaction_on_startup_enabled", false)
+                                                        
.set("disk_failure_policy", "die"))
+                                      .start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int 
PRIMARY KEY, x int)"));
+            cluster.forEach(i -> 
i.nodetoolResult("disableautocompaction").asserts().success());
+            IInvokableInstance node1 = cluster.get(1);
+            IInvokableInstance node2 = cluster.get(2);
+            for (int i = 1; i <= 1000; i++)
+            {
+                node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, x) 
VALUES (?,?)"), i, i);
+                if (i % 100 == 0)
+                    node1.flush(KEYSPACE);
+            }
+            node1.flush(KEYSPACE);
+
+            node1.nodetoolResult("repair", "-pr", "-full", KEYSPACE, 
"tbl").asserts().success();
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterTransferTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterTransferTest.java
new file mode 100644
index 0000000000..b91cbf60a9
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterTransferTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.util.File;
+
+import static net.bytebuddy.implementation.MethodDelegation.to;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.junit.Assert.assertEquals;
+
+public class StreamFailedAfterTransferTest extends TestBaseImpl
+{
+    @Test
+    public void throwAfterTransferOwnership() throws IOException
+    {
+        try (Cluster cluster = Cluster.build(2)
+                                      
.withInstanceInitializer(BBHelper::install)
+                                      .withConfig(c -> c.with(Feature.values())
+                                                        
.set("stream_entire_sstables", false)
+                                                        
.set("disk_failure_policy", "die"))
+                                      .start())
+        {
+            init(cluster);
+
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int 
PRIMARY KEY, x int) with compaction = {'class':'SizeTieredCompactionStrategy', 
'enabled':'false'}"));
+
+            IInvokableInstance node1 = cluster.get(1);
+            IInvokableInstance node2 = cluster.get(2);
+            for (int i = 1; i <= 1000; i++)
+            {
+                node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, x) 
VALUES (?,?)"), i, i);
+                if (i % 100 == 0)
+                    node1.flush(KEYSPACE);
+            }
+            node1.flush(KEYSPACE);
+            cluster.setUncaughtExceptionsFilter((e) -> 
e.getClass().getName().contains("TransactionAlreadyCompletedException"));
+            node1.nodetoolResult("repair", "-pr", "-full", KEYSPACE, 
"tbl").asserts().failure();
+
+            node2.runOnInstance(() -> {
+                ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+                LifecycleTransaction.waitForDeletions();
+                for (File f : cfs.getDirectories().getCFDirectories())
+                {
+                    try
+                    {
+                        int i = 0;
+                        while (f.list().length > 0 && i++ < 20)
+                        {
+                            Uninterruptibles.sleepUninterruptibly(1, 
TimeUnit.SECONDS);
+                            LifecycleTransaction.waitForDeletions();
+                        }
+                        File [] files = f.list();
+                        assertEquals(Arrays.toString(files), 0, files.length);
+
+                    }
+                    catch (IOException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+        }
+    }
+
+
+
+    public static class BBHelper
+    {
+        public static void install(ClassLoader classLoader, Integer num)
+        {
+            if (num == 2)
+            {
+                // in this case we need to throw after trackNew:ing the 
sstable, but before it is finished
+                new ByteBuddy().rebase(LifecycleTransaction.class)
+                               
.method(named("takeOwnership").and(takesArguments(1)))
+                               .intercept(to(BBHelper.class))
+                               .make()
+                               .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        static AtomicInteger cnt = new AtomicInteger();
+        public static void takeOwnership(ILifecycleTransaction txn, @SuperCall 
Callable<Void> zuper) throws Exception
+        {
+            zuper.call();
+            if (cnt.incrementAndGet() > 3)
+                throw new RuntimeException();
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java 
b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
index fb3cccace2..53459a4af6 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.db.lifecycle;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -33,6 +35,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState;
 import 
org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState.Action;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.MockSchema;
 import org.apache.cassandra.utils.Pair;
@@ -200,6 +203,54 @@ public class LifecycleTransactionTest extends 
AbstractTransactionalTest
         Assert.assertTrue(failed);
     }
 
+    @Test
+    public void testTransferAbort()
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        List<SSTableReader> readers = readers(0, 4, cfs);
+        LifecycleTransaction sharedTxn = 
LifecycleTransaction.offline(OperationType.UNKNOWN);
+        LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.UNKNOWN);
+        readers.forEach(txn::trackNew);
+        txn.prepareToCommit();
+        sharedTxn.takeOwnership(txn);
+        txn.commit();
+        sharedTxn.abort();
+        assertFilesGone(readers);
+    }
+
+    private void assertFilesGone(List<SSTableReader> readers)
+    {
+        readers.forEach(s -> {
+            int i = 0;
+            while 
(s.descriptor.fileFor(SSTableFormat.Components.DATA).exists() && i++ < 20)
+            {
+                Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+                LifecycleTransaction.waitForDeletions();
+            }
+            
Assert.assertFalse(s.descriptor.fileFor(SSTableFormat.Components.DATA).exists());
+        });
+    }
+
+    @Test
+    public void testTransferAbortEarly()
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        List<SSTableReader> readers = readers(0, 4, cfs);
+        LifecycleTransaction sharedTxn = 
LifecycleTransaction.offline(OperationType.UNKNOWN);
+        LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.UNKNOWN);
+        readers.forEach(txn::trackNew);
+        txn.prepareToCommit();
+        txn.abort();
+        try
+        {
+            sharedTxn.takeOwnership(txn);
+            Assert.fail("child txn is aborted, we should not take ownership");
+        }
+        catch (Exception ignored) {}
+
+        assertFilesGone(readers);
+    }
+
     private static void testBadUpdate(LifecycleTransaction txn, SSTableReader 
update, boolean original)
     {
         boolean failed = false;
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
 
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index abb6c2dc1b..a87878c70b 100644
--- 
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++ 
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -40,8 +40,9 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.lifecycle.StreamingLifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -163,10 +164,11 @@ public class CassandraEntireSSTableStreamWriterTest
 
             CassandraEntireSSTableStreamReader reader = new 
CassandraEntireSSTableStreamReader(new 
StreamMessageHeader(sstable.metadata().id, peer, session.planId(), false, 0, 0, 
0, null), header, session);
 
-            SSTableMultiWriter sstableWriter = reader.read(new 
DataInputBuffer(serializedFile.nioBuffer(), false));
-            Collection<SSTableReader> newSstables = sstableWriter.finished();
-
+            SSTableTxnSingleStreamWriter sstableWriter = 
(SSTableTxnSingleStreamWriter) reader.read(new 
DataInputBuffer(serializedFile.nioBuffer(), false));
+            StreamingLifecycleTransaction stt = new 
StreamingLifecycleTransaction();
+            Collection<SSTableReader> newSstables = 
sstableWriter.transferOwnershipTo(stt);
             assertEquals(1, newSstables.size());
+            stt.abort();
         }
     }
 
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
 
b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
index 55793c9cac..3f19da0ad7 100644
--- 
a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
+++ 
b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
@@ -52,10 +52,12 @@ import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.StreamingLifecycleTransaction;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
 import org.apache.cassandra.io.sstable.SSTableUtils;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.indexsummary.IndexSummaryManager;
@@ -236,9 +238,11 @@ public class 
EntireSSTableStreamConcurrentComponentMutationTest
         {
             CassandraStreamHeader header = 
CassandraStreamHeader.serializer.deserialize(in, 
MessagingService.current_version);
             CassandraEntireSSTableStreamReader reader = new 
CassandraEntireSSTableStreamReader(messageHeader, header, session);
-            SSTableReader streamedSSTable = 
Iterables.getOnlyElement(reader.read(in).finished());
-
+            StreamingLifecycleTransaction stt = new 
StreamingLifecycleTransaction();
+            SSTableTxnSingleStreamWriter writer = 
(SSTableTxnSingleStreamWriter) reader.read(in);
+            SSTableReader streamedSSTable = 
Iterables.getOnlyElement(writer.transferOwnershipTo(stt));
             SSTableUtils.assertContentEquals(sstable, streamedSSTable);
+            stt.abort();
         }
     }
 
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java 
b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 483d109ec5..d6ce95a90e 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -56,7 +56,6 @@ import 
org.apache.cassandra.cql3.restrictions.StatementRestrictions;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.db.ColumnFamilyStore.FlushReason;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 import org.apache.cassandra.db.CassandraWriteContext;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -68,6 +67,7 @@ import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.WriteContext;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -1287,7 +1287,7 @@ public class CustomIndexTest extends CQLTester
         }
 
         @Override
-        public SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
LifecycleNewTracker tracker)
+        public SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
ILifecycleTransaction txn)
         {
             return new SSTableFlushObserver() {
 
@@ -1672,11 +1672,11 @@ public class CustomIndexTest extends CQLTester
             }
 
             @Override
-            public SSTableFlushObserver getFlushObserver(Descriptor 
descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata)
+            public SSTableFlushObserver getFlushObserver(Descriptor 
descriptor, ILifecycleTransaction txn, TableMetadata tableMetadata)
             {
                 Set<SSTableFlushObserver> observers = indexes.values()
                                                              .stream()
-                                                             .map(i -> 
i.getFlushObserver(descriptor, tracker))
+                                                             .map(i -> 
i.getFlushObserver(descriptor, txn))
                                                              
.filter(Objects::nonNull)
                                                              
.collect(Collectors.toSet());
 
diff --git a/test/unit/org/apache/cassandra/index/StubIndexGroup.java 
b/test/unit/org/apache/cassandra/index/StubIndexGroup.java
index 22dfbe262b..ec7f457d34 100644
--- a/test/unit/org/apache/cassandra/index/StubIndexGroup.java
+++ b/test/unit/org/apache/cassandra/index/StubIndexGroup.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.WriteContext;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.memtable.Memtable;
 import org.apache.cassandra.index.transactions.IndexTransaction;
 import org.apache.cassandra.io.sstable.Component;
@@ -93,7 +93,7 @@ public class StubIndexGroup implements Index.Group
     }
 
     @Override
-    public SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
LifecycleNewTracker tracker, TableMetadata tableMetadata)
+    public SSTableFlushObserver getFlushObserver(Descriptor descriptor, 
ILifecycleTransaction txn, TableMetadata tableMetadata)
     {
         return null;
     }
diff --git a/test/unit/org/apache/cassandra/io/sstable/ScrubTest.java 
b/test/unit/org/apache/cassandra/io/sstable/ScrubTest.java
index 323cd01fbb..bd8f846572 100644
--- a/test/unit/org/apache/cassandra/io/sstable/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/ScrubTest.java
@@ -69,7 +69,7 @@ import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UUIDType;
@@ -847,9 +847,9 @@ public class ScrubTest
 
     private static class TestMultiWriter extends SimpleSSTableMultiWriter
     {
-        TestMultiWriter(SSTableWriter writer, LifecycleNewTracker 
lifecycleNewTracker)
+        TestMultiWriter(SSTableWriter writer, ILifecycleTransaction txn)
         {
-            super(writer, lifecycleNewTracker);
+            super(writer, txn);
         }
     }
 
diff --git a/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java
index 0a854c0744..c156048276 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.Version;
@@ -499,7 +500,7 @@ public class StreamReaderTest
             super(header, streamHeader, session);
         }
 
-        protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long 
totalSize, long repairedAt, TimeUUID pendingRepair, SSTableFormat<?,?> format) 
throws IOException
+        protected SSTableTxnSingleStreamWriter createWriter(ColumnFamilyStore 
cfs, long totalSize, long repairedAt, TimeUUID pendingRepair, 
SSTableFormat<?,?> format) throws IOException
         {
             return super.createWriter(cfs, totalSize, repairedAt, 
pendingRepair, format);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to