Merge branch 'cassandra-3.11' into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/33eada06
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/33eada06
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/33eada06

Branch: refs/heads/trunk
Commit: 33eada06a6dd3529da644377dba180795f522176
Parents: 4dd7faa 732c43b
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Mon Dec 10 15:07:18 2018 +0000
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Mon Dec 10 15:07:18 2018 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  8 ++--
 .../compaction/AbstractCompactionStrategy.java  |  5 ++-
 .../db/compaction/AbstractStrategyHolder.java   |  3 +-
 .../db/compaction/CompactionStrategyHolder.java |  5 ++-
 .../compaction/CompactionStrategyManager.java   |  5 ++-
 .../db/compaction/PendingRepairHolder.java      |  5 ++-
 .../db/lifecycle/LifecycleNewTracker.java       | 47 ++++++++++++++++++++
 .../db/lifecycle/LifecycleTransaction.java      |  7 ++-
 .../apache/cassandra/db/lifecycle/LogFile.java  | 24 ++++------
 .../cassandra/db/lifecycle/LogTransaction.java  |  4 +-
 .../CassandraEntireSSTableStreamReader.java     |  5 ++-
 .../db/streaming/CassandraStreamReader.java     |  5 ++-
 .../db/streaming/CassandraStreamReceiver.java   | 47 ++++++++++++++++----
 .../io/sstable/SimpleSSTableMultiWriter.java    | 16 +++----
 .../sstable/format/RangeAwareSSTableWriter.java | 12 ++---
 .../io/sstable/format/SSTableWriter.java        | 20 ++++-----
 .../io/sstable/format/big/BigFormat.java        |  6 +--
 .../io/sstable/format/big/BigTableWriter.java   |  6 +--
 .../format/big/BigTableZeroCopyWriter.java      |  5 ++-
 .../locator/AbstractReplicationStrategy.java    |  2 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  2 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |  5 ++-
 23 files changed, 166 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 75b05ea,20da1fa..b8410b8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -338,7 -2,9 +338,8 @@@
   * Make stop-server.bat wait for Cassandra to terminate (CASSANDRA-14829)
   * Correct sstable sorting for garbagecollect and levelled compaction 
(CASSANDRA-14870)
  Merged from 3.0:
+  * Streaming needs to synchronise access to LifecycleTransaction 
(CASSANDRA-14554)
   * Fix cassandra-stress write hang with default options (CASSANDRA-14616)
 - * Differentiate between slices and RTs when decoding legacy bounds 
(CASSANDRA-14919)
   * Netty epoll IOExceptions caused by unclean client disconnects being logged 
at INFO (CASSANDRA-14909)
   * Unfiltered.isEmpty conflicts with Row extends AbstractCollection.isEmpty 
(CASSANDRA-14588)
   * RangeTombstoneList doesn't properly clean up mergeable or superseded rts 
in some cases (CASSANDRA-14894)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c5149cf,700c1cc..cc6af6f
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -497,15 -507,15 +497,15 @@@ public class ColumnFamilyStore implemen
          return directories;
      }
  
-     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, int 
sstableLevel, SerializationHeader header, LifecycleTransaction txn)
 -    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, 
LifecycleNewTracker lifecycleNewTracker)
++    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, int 
sstableLevel, SerializationHeader header, LifecycleNewTracker 
lifecycleNewTracker)
      {
 -        MetadataCollector collector = new 
MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
 -        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
collector, header, lifecycleNewTracker);
 +        MetadataCollector collector = new 
MetadataCollector(metadata().comparator).sstableLevel(sstableLevel);
-         return createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, collector, header, txn);
++        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, collector, header, lifecycleNewTracker);
      }
  
-     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, 
MetadataCollector metadataCollector, SerializationHeader header, 
LifecycleTransaction txn)
 -    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, MetadataCollector metadataCollector, 
SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
++    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, 
MetadataCollector metadataCollector, SerializationHeader header, 
LifecycleNewTracker lifecycleNewTracker)
      {
-         return 
getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, pendingRepair, isTransient, metadataCollector, header, 
indexManager.listIndexes(), txn);
 -        return 
getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, metadataCollector, header, indexManager.listIndexes(), 
lifecycleNewTracker);
++        return 
getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, pendingRepair, isTransient, metadataCollector, header, 
indexManager.listIndexes(), lifecycleNewTracker);
      }
  
      public boolean supportsEarlyOpen()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 28ea90a,3d7800d..9d7825c
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -534,9 -583,9 +535,9 @@@ public abstract class AbstractCompactio
                                                         MetadataCollector meta,
                                                         SerializationHeader 
header,
                                                         Collection<Index> 
indexes,
-                                                        LifecycleTransaction 
txn)
+                                                        LifecycleNewTracker 
lifecycleNewTracker)
      {
-         return SimpleSSTableMultiWriter.create(descriptor, keyCount, 
repairedAt, pendingRepair, isTransient, cfs.metadata, meta, header, indexes, 
txn);
 -        return SimpleSSTableMultiWriter.create(descriptor, keyCount, 
repairedAt, cfs.metadata, meta, header, indexes, lifecycleNewTracker);
++        return SimpleSSTableMultiWriter.create(descriptor, keyCount, 
repairedAt, pendingRepair, isTransient, cfs.metadata, meta, header, indexes, 
lifecycleNewTracker);
      }
  
      public boolean supportsEarlyOpen()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
index 24bea06,0000000..63b7909
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
@@@ -1,209 -1,0 +1,210 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.compaction;
 +
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.function.Supplier;
 +
 +import com.google.common.base.Preconditions;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.SerializationHeader;
++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.schema.CompactionParams;
 +
 +/**
 + * Wrapper that's aware of how sstables are divided between separate 
strategies,
 + * and provides a standard interface to them
 + *
 + * not threadsafe, calls must be synchronized by caller
 + */
 +public abstract class AbstractStrategyHolder
 +{
 +    public static class TaskSupplier implements Comparable<TaskSupplier>
 +    {
 +        private final int numRemaining;
 +        private final Supplier<AbstractCompactionTask> supplier;
 +
 +        TaskSupplier(int numRemaining, Supplier<AbstractCompactionTask> 
supplier)
 +        {
 +            this.numRemaining = numRemaining;
 +            this.supplier = supplier;
 +        }
 +
 +        public AbstractCompactionTask getTask()
 +        {
 +            return supplier.get();
 +        }
 +
 +        public int compareTo(TaskSupplier o)
 +        {
 +            return o.numRemaining - numRemaining;
 +        }
 +    }
 +
 +    public static interface DestinationRouter
 +    {
 +        int getIndexForSSTable(SSTableReader sstable);
 +        int getIndexForSSTableDirectory(Descriptor descriptor);
 +    }
 +
 +    /**
 +     * Maps sstables to their token partition bucket
 +     */
 +    static class GroupedSSTableContainer
 +    {
 +        private final AbstractStrategyHolder holder;
 +        private final Set<SSTableReader>[] groups;
 +
 +        private GroupedSSTableContainer(AbstractStrategyHolder holder)
 +        {
 +            this.holder = holder;
 +            Preconditions.checkArgument(holder.numTokenPartitions > 0, 
"numTokenPartitions not set");
 +            groups = new Set[holder.numTokenPartitions];
 +        }
 +
 +        void add(SSTableReader sstable)
 +        {
 +            Preconditions.checkArgument(holder.managesSSTable(sstable), "this 
strategy holder doesn't manage %s", sstable);
 +            int idx = holder.router.getIndexForSSTable(sstable);
 +            Preconditions.checkState(idx >= 0 && idx < 
holder.numTokenPartitions, "Invalid sstable index (%s) for %s", idx, sstable);
 +            if (groups[idx] == null)
 +                groups[idx] = new HashSet<>();
 +            groups[idx].add(sstable);
 +        }
 +
 +        int numGroups()
 +        {
 +            return groups.length;
 +        }
 +
 +        Set<SSTableReader> getGroup(int i)
 +        {
 +            Preconditions.checkArgument(i >= 0 && i < groups.length);
 +            Set<SSTableReader> group = groups[i];
 +            return group != null ? group : Collections.emptySet();
 +        }
 +
 +        boolean isGroupEmpty(int i)
 +        {
 +            return getGroup(i).isEmpty();
 +        }
 +
 +        boolean isEmpty()
 +        {
 +            for (int i = 0; i < groups.length; i++)
 +                if (!isGroupEmpty(i))
 +                    return false;
 +            return true;
 +        }
 +    }
 +
 +    protected final ColumnFamilyStore cfs;
 +    final DestinationRouter router;
 +    private int numTokenPartitions = -1;
 +
 +    AbstractStrategyHolder(ColumnFamilyStore cfs, DestinationRouter router)
 +    {
 +        this.cfs = cfs;
 +        this.router = router;
 +    }
 +
 +    public abstract void startup();
 +
 +    public abstract void shutdown();
 +
 +    final void setStrategy(CompactionParams params, int numTokenPartitions)
 +    {
 +        Preconditions.checkArgument(numTokenPartitions > 0, "at least one 
token partition required");
 +        shutdown();
 +        this.numTokenPartitions = numTokenPartitions;
 +        setStrategyInternal(params, numTokenPartitions);
 +    }
 +
 +    protected abstract void setStrategyInternal(CompactionParams params, int 
numTokenPartitions);
 +
 +    /**
 +     * SSTables are grouped by their repaired and pending repair status. This 
method determines if this holder
 +     * holds the sstable for the given repaired/grouped statuses. Holders 
should be mutually exclusive in the
 +     * groups they deal with. IOW, if one holder returns true for a given 
isRepaired/isPendingRepair combo,
 +     * none of the others should.
 +     */
 +    public abstract boolean managesRepairedGroup(boolean isRepaired, boolean 
isPendingRepair, boolean isTransient);
 +
 +    public boolean managesSSTable(SSTableReader sstable)
 +    {
 +        return managesRepairedGroup(sstable.isRepaired(), 
sstable.isPendingRepair(), sstable.isTransient());
 +    }
 +
 +    public abstract AbstractCompactionStrategy getStrategyFor(SSTableReader 
sstable);
 +
 +    public abstract Iterable<AbstractCompactionStrategy> allStrategies();
 +
 +    public abstract Collection<TaskSupplier> getBackgroundTaskSuppliers(int 
gcBefore);
 +
 +    public abstract Collection<AbstractCompactionTask> getMaximalTasks(int 
gcBefore, boolean splitOutput);
 +
 +    public abstract Collection<AbstractCompactionTask> 
getUserDefinedTasks(GroupedSSTableContainer sstables, int gcBefore);
 +
 +    public GroupedSSTableContainer createGroupedSSTableContainer()
 +    {
 +        return new GroupedSSTableContainer(this);
 +    }
 +
 +    public abstract void addSSTables(GroupedSSTableContainer sstables);
 +
 +    public abstract void removeSSTables(GroupedSSTableContainer sstables);
 +
 +    public abstract void replaceSSTables(GroupedSSTableContainer removed, 
GroupedSSTableContainer added);
 +
 +    public abstract List<ISSTableScanner> getScanners(GroupedSSTableContainer 
sstables, Collection<Range<Token>> ranges);
 +
 +
 +    public abstract SSTableMultiWriter createSSTableMultiWriter(Descriptor 
descriptor,
 +                                                                long keyCount,
 +                                                                long 
repairedAt,
 +                                                                UUID 
pendingRepair,
 +                                                                boolean 
isTransient,
 +                                                                
MetadataCollector collector,
 +                                                                
SerializationHeader header,
 +                                                                
Collection<Index> indexes,
-                                                                 
LifecycleTransaction txn);
++                                                                
LifecycleNewTracker lifecycleNewTracker);
 +
 +    /**
 +     * Return the directory index the given compaction strategy belongs to, 
or -1
 +     * if it's not held by this holder
 +     */
 +    public abstract int getStrategyIndex(AbstractCompactionStrategy strategy);
 +
 +    public abstract boolean containsSSTable(SSTableReader sstable);
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
index 8ce93fa,0000000..129ee79
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
@@@ -1,264 -1,0 +1,265 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.compaction;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.UUID;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Iterables;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.SerializationHeader;
++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.schema.CompactionParams;
 +import org.apache.cassandra.service.ActiveRepairService;
 +
 +public class CompactionStrategyHolder extends AbstractStrategyHolder
 +{
 +    private final List<AbstractCompactionStrategy> strategies = new 
ArrayList<>();
 +    private final boolean isRepaired;
 +
 +    public CompactionStrategyHolder(ColumnFamilyStore cfs, DestinationRouter 
router, boolean isRepaired)
 +    {
 +        super(cfs, router);
 +        this.isRepaired = isRepaired;
 +    }
 +
 +    @Override
 +    public void startup()
 +    {
 +        strategies.forEach(AbstractCompactionStrategy::startup);
 +    }
 +
 +    @Override
 +    public void shutdown()
 +    {
 +        strategies.forEach(AbstractCompactionStrategy::shutdown);
 +    }
 +
 +    @Override
 +    public void setStrategyInternal(CompactionParams params, int 
numTokenPartitions)
 +    {
 +        strategies.clear();
 +        for (int i = 0; i < numTokenPartitions; i++)
 +            strategies.add(cfs.createCompactionStrategyInstance(params));
 +    }
 +
 +    @Override
 +    public boolean managesRepairedGroup(boolean isRepaired, boolean 
isPendingRepair, boolean isTransient)
 +    {
 +        if (!isPendingRepair)
 +        {
 +            Preconditions.checkArgument(!isTransient, "isTransient can only 
be true for sstables pending repairs");
 +            return this.isRepaired == isRepaired;
 +        }
 +        else
 +        {
 +            Preconditions.checkArgument(!isRepaired, "SSTables cannot be both 
repaired and pending repair");
 +            return false;
 +
 +        }
 +    }
 +
 +    @Override
 +    public AbstractCompactionStrategy getStrategyFor(SSTableReader sstable)
 +    {
 +        Preconditions.checkArgument(managesSSTable(sstable), "Attempting to 
get compaction strategy from wrong holder");
 +        return strategies.get(router.getIndexForSSTable(sstable));
 +    }
 +
 +    @Override
 +    public Iterable<AbstractCompactionStrategy> allStrategies()
 +    {
 +        return strategies;
 +    }
 +
 +    @Override
 +    public Collection<TaskSupplier> getBackgroundTaskSuppliers(int gcBefore)
 +    {
 +        List<TaskSupplier> suppliers = new ArrayList<>(strategies.size());
 +        for (AbstractCompactionStrategy strategy : strategies)
 +            suppliers.add(new 
TaskSupplier(strategy.getEstimatedRemainingTasks(), () -> 
strategy.getNextBackgroundTask(gcBefore)));
 +
 +        return suppliers;
 +    }
 +
 +    @Override
 +    public Collection<AbstractCompactionTask> getMaximalTasks(int gcBefore, 
boolean splitOutput)
 +    {
 +        List<AbstractCompactionTask> tasks = new 
ArrayList<>(strategies.size());
 +        for (AbstractCompactionStrategy strategy : strategies)
 +        {
 +            Collection<AbstractCompactionTask> task = 
strategy.getMaximalTask(gcBefore, splitOutput);
 +            if (task != null)
 +                tasks.addAll(task);
 +        }
 +        return tasks;
 +    }
 +
 +    @Override
 +    public Collection<AbstractCompactionTask> 
getUserDefinedTasks(GroupedSSTableContainer sstables, int gcBefore)
 +    {
 +        List<AbstractCompactionTask> tasks = new 
ArrayList<>(strategies.size());
 +        for (int i = 0; i < strategies.size(); i++)
 +        {
 +            if (sstables.isGroupEmpty(i))
 +                continue;
 +
 +            
tasks.add(strategies.get(i).getUserDefinedTask(sstables.getGroup(i), gcBefore));
 +        }
 +        return tasks;
 +    }
 +
 +    @Override
 +    public void addSSTables(GroupedSSTableContainer sstables)
 +    {
 +        Preconditions.checkArgument(sstables.numGroups() == 
strategies.size());
 +        for (int i = 0; i < strategies.size(); i++)
 +        {
 +            if (!sstables.isGroupEmpty(i))
 +                strategies.get(i).addSSTables(sstables.getGroup(i));
 +        }
 +    }
 +
 +    @Override
 +    public void removeSSTables(GroupedSSTableContainer sstables)
 +    {
 +        Preconditions.checkArgument(sstables.numGroups() == 
strategies.size());
 +        for (int i = 0; i < strategies.size(); i++)
 +        {
 +            if (!sstables.isGroupEmpty(i))
 +                strategies.get(i).removeSSTables(sstables.getGroup(i));
 +        }
 +    }
 +
 +    @Override
 +    public void replaceSSTables(GroupedSSTableContainer removed, 
GroupedSSTableContainer added)
 +    {
 +        Preconditions.checkArgument(removed.numGroups() == strategies.size());
 +        Preconditions.checkArgument(added.numGroups() == strategies.size());
 +        for (int i = 0; i < strategies.size(); i++)
 +        {
 +            if (removed.isGroupEmpty(i) && added.isGroupEmpty(i))
 +                continue;
 +
 +            if (removed.isGroupEmpty(i))
 +                strategies.get(i).addSSTables(added.getGroup(i));
 +            else
 +                strategies.get(i).replaceSSTables(removed.getGroup(i), 
added.getGroup(i));
 +        }
 +    }
 +
 +    public AbstractCompactionStrategy first()
 +    {
 +        return strategies.get(0);
 +    }
 +
 +    @Override
 +    @SuppressWarnings("resource")
 +    public List<ISSTableScanner> getScanners(GroupedSSTableContainer 
sstables, Collection<Range<Token>> ranges)
 +    {
 +        List<ISSTableScanner> scanners = new ArrayList<>(strategies.size());
 +        for (int i = 0; i < strategies.size(); i++)
 +        {
 +            if (sstables.isGroupEmpty(i))
 +                continue;
 +
 +            
scanners.addAll(strategies.get(i).getScanners(sstables.getGroup(i), 
ranges).scanners);
 +        }
 +        return scanners;
 +    }
 +
 +    Collection<Collection<SSTableReader>> 
groupForAnticompaction(Iterable<SSTableReader> sstables)
 +    {
 +        Preconditions.checkState(!isRepaired);
 +        GroupedSSTableContainer group = createGroupedSSTableContainer();
 +        sstables.forEach(group::add);
 +
 +        Collection<Collection<SSTableReader>> anticompactionGroups = new 
ArrayList<>();
 +        for (int i = 0; i < strategies.size(); i++)
 +        {
 +            if (group.isGroupEmpty(i))
 +                continue;
 +
 +            
anticompactionGroups.addAll(strategies.get(i).groupSSTablesForAntiCompaction(group.getGroup(i)));
 +        }
 +
 +        return anticompactionGroups;
 +    }
 +
 +    @Override
 +    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
 +                                                       long keyCount,
 +                                                       long repairedAt,
 +                                                       UUID pendingRepair,
 +                                                       boolean isTransient,
 +                                                       MetadataCollector 
collector,
 +                                                       SerializationHeader 
header,
 +                                                       Collection<Index> 
indexes,
-                                                        LifecycleTransaction 
txn)
++                                                       LifecycleNewTracker 
lifecycleNewTracker)
 +    {
 +        if (isRepaired)
 +        {
 +            Preconditions.checkArgument(repairedAt != 
ActiveRepairService.UNREPAIRED_SSTABLE,
 +                                        "Repaired CompactionStrategyHolder 
can't create unrepaired sstable writers");
 +        }
 +        else
 +        {
 +            Preconditions.checkArgument(repairedAt == 
ActiveRepairService.UNREPAIRED_SSTABLE,
 +                                        "Unrepaired CompactionStrategyHolder 
can't create repaired sstable writers");
 +        }
 +        Preconditions.checkArgument(pendingRepair == null,
 +                                    "CompactionStrategyHolder can't create 
sstable writer with pendingRepair id");
 +        // to avoid creating a compaction strategy for the wrong pending 
repair manager, we get the index based on where the sstable is to be written
 +        AbstractCompactionStrategy strategy = 
strategies.get(router.getIndexForSSTableDirectory(descriptor));
 +        return strategy.createSSTableMultiWriter(descriptor,
 +                                                 keyCount,
 +                                                 repairedAt,
 +                                                 pendingRepair,
 +                                                 isTransient,
 +                                                 collector,
 +                                                 header,
 +                                                 indexes,
-                                                  txn);
++                                                 lifecycleNewTracker);
 +    }
 +
 +    @Override
 +    public int getStrategyIndex(AbstractCompactionStrategy strategy)
 +    {
 +        return strategies.indexOf(strategy);
 +    }
 +
 +    @Override
 +    public boolean containsSSTable(SSTableReader sstable)
 +    {
 +        return Iterables.any(strategies, acs -> 
acs.getSSTables().contains(sstable));
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 45ccbe2,86170a1..b978641
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -33,12 -23,13 +33,13 @@@ import java.util.concurrent.Callable
  import java.util.concurrent.locks.ReentrantReadWriteLock;
  import java.util.function.Supplier;
  import java.util.stream.Collectors;
 -import java.util.stream.Stream;
  
  import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ImmutableList;
  import com.google.common.collect.Iterables;
 -import com.google.common.primitives.Ints;
 -
 +import com.google.common.collect.Lists;
 +import com.google.common.primitives.Longs;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -1117,22 -1016,20 +1118,22 @@@ public class CompactionStrategyManager 
                                                         MetadataCollector 
collector,
                                                         SerializationHeader 
header,
                                                         Collection<Index> 
indexes,
-                                                        LifecycleTransaction 
txn)
+                                                        LifecycleNewTracker 
lifecycleNewTracker)
      {
 +        SSTable.validateRepairedMetadata(repairedAt, pendingRepair, 
isTransient);
          maybeReloadDiskBoundaries();
          readLock.lock();
          try
          {
 -            if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
 -            {
 -                return unrepaired.get(0).createSSTableMultiWriter(descriptor, 
keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker);
 -            }
 -            else
 -            {
 -                return repaired.get(0).createSSTableMultiWriter(descriptor, 
keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker);
 -            }
 +            return getHolder(repairedAt, pendingRepair, 
isTransient).createSSTableMultiWriter(descriptor,
 +                                                                              
                keyCount,
 +                                                                              
                repairedAt,
 +                                                                              
                pendingRepair,
 +                                                                              
                isTransient,
 +                                                                              
                collector,
 +                                                                              
                header,
 +                                                                              
                indexes,
-                                                                               
                txn);
++                                                                              
                lifecycleNewTracker);
          }
          finally
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
index 92e44a7,0000000..03d4111
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
@@@ -1,284 -1,0 +1,285 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.compaction;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.UUID;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Iterables;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.SerializationHeader;
++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.schema.CompactionParams;
 +import org.apache.cassandra.service.ActiveRepairService;
 +
 +public class PendingRepairHolder extends AbstractStrategyHolder
 +{
 +    private final List<PendingRepairManager> managers = new ArrayList<>();
 +    private final boolean isTransient;
 +
 +    public PendingRepairHolder(ColumnFamilyStore cfs, DestinationRouter 
router, boolean isTransient)
 +    {
 +        super(cfs, router);
 +        this.isTransient = isTransient;
 +    }
 +
 +    @Override
 +    public void startup()
 +    {
 +        managers.forEach(PendingRepairManager::startup);
 +    }
 +
 +    @Override
 +    public void shutdown()
 +    {
 +        managers.forEach(PendingRepairManager::shutdown);
 +    }
 +
 +    @Override
 +    public void setStrategyInternal(CompactionParams params, int 
numTokenPartitions)
 +    {
 +        managers.clear();
 +        for (int i = 0; i < numTokenPartitions; i++)
 +            managers.add(new PendingRepairManager(cfs, params, isTransient));
 +    }
 +
 +    @Override
 +    public boolean managesRepairedGroup(boolean isRepaired, boolean 
isPendingRepair, boolean isTransient)
 +    {
 +        Preconditions.checkArgument(!isPendingRepair || !isRepaired,
 +                                    "SSTables cannot be both repaired and 
pending repair");
 +        return isPendingRepair && (this.isTransient == isTransient);
 +    }
 +
 +    @Override
 +    public AbstractCompactionStrategy getStrategyFor(SSTableReader sstable)
 +    {
 +        Preconditions.checkArgument(managesSSTable(sstable), "Attempting to 
get compaction strategy from wrong holder");
 +        return 
managers.get(router.getIndexForSSTable(sstable)).getOrCreate(sstable);
 +    }
 +
 +    @Override
 +    public Iterable<AbstractCompactionStrategy> allStrategies()
 +    {
 +        return Iterables.concat(Iterables.transform(managers, 
PendingRepairManager::getStrategies));
 +    }
 +
 +    Iterable<AbstractCompactionStrategy> getStrategiesFor(UUID session)
 +    {
 +        List<AbstractCompactionStrategy> strategies = new 
ArrayList<>(managers.size());
 +        for (PendingRepairManager manager : managers)
 +        {
 +            AbstractCompactionStrategy strategy = manager.get(session);
 +            if (strategy != null)
 +                strategies.add(strategy);
 +        }
 +        return strategies;
 +    }
 +
 +    public Iterable<PendingRepairManager> getManagers()
 +    {
 +        return managers;
 +    }
 +
 +    @Override
 +    public Collection<TaskSupplier> getBackgroundTaskSuppliers(int gcBefore)
 +    {
 +        List<TaskSupplier> suppliers = new ArrayList<>(managers.size());
 +        for (PendingRepairManager manager : managers)
 +            suppliers.add(new 
TaskSupplier(manager.getMaxEstimatedRemainingTasks(), () -> 
manager.getNextBackgroundTask(gcBefore)));
 +
 +        return suppliers;
 +    }
 +
 +    @Override
 +    public Collection<AbstractCompactionTask> getMaximalTasks(int gcBefore, 
boolean splitOutput)
 +    {
 +        List<AbstractCompactionTask> tasks = new ArrayList<>(managers.size());
 +        for (PendingRepairManager manager : managers)
 +        {
 +            Collection<AbstractCompactionTask> task = 
manager.getMaximalTasks(gcBefore, splitOutput);
 +            if (task != null)
 +                tasks.addAll(task);
 +        }
 +        return tasks;
 +    }
 +
 +    @Override
 +    public Collection<AbstractCompactionTask> 
getUserDefinedTasks(GroupedSSTableContainer sstables, int gcBefore)
 +    {
 +        List<AbstractCompactionTask> tasks = new ArrayList<>(managers.size());
 +
 +        for (int i = 0; i < managers.size(); i++)
 +        {
 +            if (sstables.isGroupEmpty(i))
 +                continue;
 +
 +            
tasks.addAll(managers.get(i).createUserDefinedTasks(sstables.getGroup(i), 
gcBefore));
 +        }
 +        return tasks;
 +    }
 +
 +    AbstractCompactionTask getNextRepairFinishedTask()
 +    {
 +        List<TaskSupplier> repairFinishedSuppliers = 
getRepairFinishedTaskSuppliers();
 +        if (!repairFinishedSuppliers.isEmpty())
 +        {
 +            Collections.sort(repairFinishedSuppliers);
 +            for (TaskSupplier supplier : repairFinishedSuppliers)
 +            {
 +                AbstractCompactionTask task = supplier.getTask();
 +                if (task != null)
 +                    return task;
 +            }
 +        }
 +        return null;
 +    }
 +
 +    private ArrayList<TaskSupplier> getRepairFinishedTaskSuppliers()
 +    {
 +        ArrayList<TaskSupplier> suppliers = new ArrayList<>(managers.size());
 +        for (PendingRepairManager manager : managers)
 +        {
 +            int numPending = manager.getNumPendingRepairFinishedTasks();
 +            if (numPending > 0)
 +            {
 +                suppliers.add(new TaskSupplier(numPending, 
manager::getNextRepairFinishedTask));
 +            }
 +        }
 +
 +        return suppliers;
 +    }
 +
 +    @Override
 +    public void addSSTables(GroupedSSTableContainer sstables)
 +    {
 +        Preconditions.checkArgument(sstables.numGroups() == managers.size());
 +        for (int i = 0; i < managers.size(); i++)
 +        {
 +            if (!sstables.isGroupEmpty(i))
 +                managers.get(i).addSSTables(sstables.getGroup(i));
 +        }
 +    }
 +
 +    @Override
 +    public void removeSSTables(GroupedSSTableContainer sstables)
 +    {
 +        Preconditions.checkArgument(sstables.numGroups() == managers.size());
 +        for (int i = 0; i < managers.size(); i++)
 +        {
 +            if (!sstables.isGroupEmpty(i))
 +                managers.get(i).removeSSTables(sstables.getGroup(i));
 +        }
 +    }
 +
 +    @Override
 +    public void replaceSSTables(GroupedSSTableContainer removed, 
GroupedSSTableContainer added)
 +    {
 +        Preconditions.checkArgument(removed.numGroups() == managers.size());
 +        Preconditions.checkArgument(added.numGroups() == managers.size());
 +        for (int i = 0; i < managers.size(); i++)
 +        {
 +            if (removed.isGroupEmpty(i) && added.isGroupEmpty(i))
 +                continue;
 +
 +            if (removed.isGroupEmpty(i))
 +                managers.get(i).addSSTables(added.getGroup(i));
 +            else
 +                managers.get(i).replaceSSTables(removed.getGroup(i), 
added.getGroup(i));
 +        }
 +    }
 +
 +    @Override
 +    public List<ISSTableScanner> getScanners(GroupedSSTableContainer 
sstables, Collection<Range<Token>> ranges)
 +    {
 +        List<ISSTableScanner> scanners = new ArrayList<>(managers.size());
 +        for (int i = 0; i < managers.size(); i++)
 +        {
 +            if (sstables.isGroupEmpty(i))
 +                continue;
 +
 +            scanners.addAll(managers.get(i).getScanners(sstables.getGroup(i), 
ranges));
 +        }
 +        return scanners;
 +    }
 +
 +    @Override
 +    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
 +                                                       long keyCount,
 +                                                       long repairedAt,
 +                                                       UUID pendingRepair,
 +                                                       boolean isTransient,
 +                                                       MetadataCollector 
collector,
 +                                                       SerializationHeader 
header,
 +                                                       Collection<Index> 
indexes,
-                                                        LifecycleTransaction 
txn)
++                                                       LifecycleNewTracker 
lifecycleNewTracker)
 +    {
 +        Preconditions.checkArgument(repairedAt == 
ActiveRepairService.UNREPAIRED_SSTABLE,
 +                                    "PendingRepairHolder can't create 
sstablewriter with repaired at set");
 +        Preconditions.checkArgument(pendingRepair != null,
 +                                    "PendingRepairHolder can't create sstable 
writer without pendingRepair id");
 +        // to avoid creating a compaction strategy for the wrong pending 
repair manager, we get the index based on where the sstable is to be written
 +        AbstractCompactionStrategy strategy = 
managers.get(router.getIndexForSSTableDirectory(descriptor)).getOrCreate(pendingRepair);
 +        return strategy.createSSTableMultiWriter(descriptor,
 +                                                 keyCount,
 +                                                 repairedAt,
 +                                                 pendingRepair,
 +                                                 isTransient,
 +                                                 collector,
 +                                                 header,
 +                                                 indexes,
-                                                  txn);
++                                                 lifecycleNewTracker);
 +    }
 +
 +    @Override
 +    public int getStrategyIndex(AbstractCompactionStrategy strategy)
 +    {
 +        for (int i = 0; i < managers.size(); i++)
 +        {
 +            if (managers.get(i).hasStrategy(strategy))
 +                return i;
 +        }
 +        return -1;
 +    }
 +
 +    public boolean hasDataForSession(UUID sessionID)
 +    {
 +        return Iterables.any(managers, prm -> 
prm.hasDataForSession(sessionID));
 +    }
 +
 +    @Override
 +    public boolean containsSSTable(SSTableReader sstable)
 +    {
 +        return Iterables.any(managers, prm -> prm.containsSSTable(sstable));
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index a2c1c32,12531cd..64d6af0
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@@ -420,7 -420,7 +420,7 @@@ class LogTransaction extends Transactio
       * for further details on transaction logs.
       *
       * This method is called on startup and by the standalone sstableutil 
tool when the cleanup option is specified,
--     * @see StandaloneSSTableUtil.
++     * @see org.apache.cassandra.tools.StandaloneSSTableUtil
       *
       * @return true if the leftovers of all transaction logs found were 
removed, false otherwise.
       *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
index 6f8f06a,0000000..479ee71
mode 100644,000000..100644
--- 
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@@ -1,177 -1,0 +1,178 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.streaming;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.Collection;
 +
 +import com.google.common.base.Throwables;
++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.format.SSTableFormat;
 +import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.streaming.ProgressInfo;
 +import org.apache.cassandra.streaming.StreamReceiver;
 +import org.apache.cassandra.streaming.StreamSession;
 +import org.apache.cassandra.streaming.messages.StreamMessageHeader;
 +
 +import static java.lang.String.format;
 +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
 +
 +/**
 + * CassandraEntireSSTableStreamReader reads SSTable off the wire and writes 
it to disk.
 + */
 +public class CassandraEntireSSTableStreamReader implements IStreamReader
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(CassandraEntireSSTableStreamReader.class);
 +
 +    private final TableId tableId;
 +    private final StreamSession session;
 +    private final CassandraStreamHeader header;
 +    private final int fileSequenceNumber;
 +
 +    public CassandraEntireSSTableStreamReader(StreamMessageHeader 
messageHeader, CassandraStreamHeader streamHeader, StreamSession session)
 +    {
 +        if (streamHeader.format != SSTableFormat.Type.BIG)
 +            throw new AssertionError("Unsupported SSTable format " + 
streamHeader.format);
 +
 +        if (session.getPendingRepair() != null)
 +        {
 +            // we should only ever be streaming pending repair sstables if 
the session has a pending repair id
 +            if 
(!session.getPendingRepair().equals(messageHeader.pendingRepair))
 +                throw new IllegalStateException(format("Stream Session & 
SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId));
 +        }
 +
 +        this.header = streamHeader;
 +        this.session = session;
 +        this.tableId = messageHeader.tableId;
 +        this.fileSequenceNumber = messageHeader.sequenceNumber;
 +    }
 +
 +    /**
 +     * @param in where this reads data from
 +     * @return SSTable transferred
 +     * @throws IOException if reading the remote sstable fails. Will throw an 
RTE if local write fails.
 +     */
 +    @SuppressWarnings("resource") // input needs to remain open, streams on 
top of it can't be closed
 +    @Override
 +    public SSTableMultiWriter read(DataInputPlus in) throws IOException
 +    {
 +        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
 +        if (cfs == null)
 +        {
 +            // schema was dropped during streaming
 +            throw new IOException("Table " + tableId + " was dropped during 
streaming");
 +        }
 +
 +        ComponentManifest manifest = header.componentManifest;
 +        long totalSize = manifest.totalSize();
 +
 +        logger.debug("[Stream #{}] Started receiving sstable #{} from {}, 
size = {}, table = {}",
 +                     session.planId(),
 +                     fileSequenceNumber,
 +                     session.peer,
 +                     prettyPrintMemory(totalSize),
 +                     cfs.metadata());
 +
 +        BigTableZeroCopyWriter writer = null;
 +
 +        try
 +        {
 +            writer = createWriter(cfs, totalSize, manifest.components());
 +            long bytesRead = 0;
 +            for (Component component : manifest.components())
 +            {
 +                long length = manifest.sizeOf(component);
 +
 +                logger.debug("[Stream #{}] Started receiving {} component 
from {}, componentSize = {}, readBytes = {}, totalSize = {}",
 +                             session.planId(),
 +                             component,
 +                             session.peer,
 +                             prettyPrintMemory(length),
 +                             prettyPrintMemory(bytesRead),
 +                             prettyPrintMemory(totalSize));
 +
 +                writer.writeComponent(component.type, in, length);
 +                session.progress(writer.descriptor.filenameFor(component), 
ProgressInfo.Direction.IN, length, length);
 +                bytesRead += length;
 +
 +                logger.debug("[Stream #{}] Finished receiving {} component 
from {}, componentSize = {}, readBytes = {}, totalSize = {}",
 +                             session.planId(),
 +                             component,
 +                             session.peer,
 +                             prettyPrintMemory(length),
 +                             prettyPrintMemory(bytesRead),
 +                             prettyPrintMemory(totalSize));
 +            }
 +
 +            
writer.descriptor.getMetadataSerializer().mutateLevel(writer.descriptor, 
header.sstableLevel);
 +            return writer;
 +        }
 +        catch (Throwable e)
 +        {
 +            logger.error("[Stream {}] Error while reading sstable from stream 
for table = {}", session.planId(), cfs.metadata(), e);
 +            if (writer != null)
 +                e = writer.abort(e);
 +            Throwables.throwIfUnchecked(e);
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    private File getDataDir(ColumnFamilyStore cfs, long totalSize) throws 
IOException
 +    {
 +        Directories.DataDirectory localDir = 
cfs.getDirectories().getWriteableLocation(totalSize);
 +        if (localDir == null)
 +            throw new IOException(format("Insufficient disk space to store 
%s", prettyPrintMemory(totalSize)));
 +
 +        File dir = 
cfs.getDirectories().getLocationForDisk(cfs.getDiskBoundaries().getCorrectDiskForKey(header.firstKey));
 +
 +        if (dir == null)
 +            return cfs.getDirectories().getDirectoryForNewSSTables();
 +
 +        return dir;
 +    }
 +
 +    @SuppressWarnings("resource")
 +    protected BigTableZeroCopyWriter createWriter(ColumnFamilyStore cfs, long 
totalSize, Collection<Component> components) throws IOException
 +    {
 +        File dataDir = getDataDir(cfs, totalSize);
 +
 +        StreamReceiver streamReceiver = session.getAggregator(tableId);
 +        assert streamReceiver instanceof CassandraStreamReceiver;
 +
-         LifecycleTransaction txn = 
CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction();
++        LifecycleNewTracker lifecycleNewTracker = 
CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).createLifecycleNewTracker();
 +
 +        Descriptor desc = cfs.newSSTableDescriptor(dataDir, header.version, 
header.format);
 +
 +        logger.debug("[Table #{}] {} Components to write: {}", 
cfs.metadata(), desc.filenameFor(Component.DATA), components);
 +
-         return new BigTableZeroCopyWriter(desc, cfs.metadata, txn, 
components);
++        return new BigTableZeroCopyWriter(desc, cfs.metadata, 
lifecycleNewTracker, components);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 572c648,0000000..43371a9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@@ -1,286 -1,0 +1,287 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.streaming;
 +
 +import java.io.*;
 +import java.util.Collection;
 +import java.util.UUID;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.base.Throwables;
 +import com.google.common.collect.UnmodifiableIterator;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.TrackedDataInputPlus;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.db.*;
++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
 +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
 +import org.apache.cassandra.io.sstable.format.SSTableFormat;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.streaming.ProgressInfo;
 +import org.apache.cassandra.streaming.StreamReceiver;
 +import org.apache.cassandra.streaming.StreamSession;
 +import org.apache.cassandra.streaming.compress.StreamCompressionInputStream;
 +import org.apache.cassandra.streaming.messages.StreamMessageHeader;
 +import org.apache.cassandra.streaming.messages.StreamMessage;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +/**
 + * CassandraStreamReader reads from stream and writes to SSTable.
 + */
 +public class CassandraStreamReader implements IStreamReader
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(CassandraStreamReader.class);
 +    protected final TableId tableId;
 +    protected final long estimatedKeys;
 +    protected final Collection<SSTableReader.PartitionPositionBounds> 
sections;
 +    protected final StreamSession session;
 +    protected final Version inputVersion;
 +    protected final long repairedAt;
 +    protected final UUID pendingRepair;
 +    protected final SSTableFormat.Type format;
 +    protected final int sstableLevel;
 +    protected final SerializationHeader.Component header;
 +    protected final int fileSeqNum;
 +
 +    public CassandraStreamReader(StreamMessageHeader header, 
CassandraStreamHeader streamHeader, StreamSession session)
 +    {
 +        if (session.getPendingRepair() != null)
 +        {
 +            // we should only ever be streaming pending repair
 +            // sstables if the session has a pending repair id
 +            assert session.getPendingRepair().equals(header.pendingRepair);
 +        }
 +        this.session = session;
 +        this.tableId = header.tableId;
 +        this.estimatedKeys = streamHeader.estimatedKeys;
 +        this.sections = streamHeader.sections;
 +        this.inputVersion = streamHeader.version;
 +        this.repairedAt = header.repairedAt;
 +        this.pendingRepair = header.pendingRepair;
 +        this.format = streamHeader.format;
 +        this.sstableLevel = streamHeader.sstableLevel;
 +        this.header = streamHeader.serializationHeader;
 +        this.fileSeqNum = header.sequenceNumber;
 +    }
 +
 +    /**
 +     * @param inputPlus where this reads data from
 +     * @return SSTable transferred
 +     * @throws IOException if reading the remote sstable fails. Will throw an 
RTE if local write fails.
 +     */
 +    @SuppressWarnings("resource") // input needs to remain open, streams on 
top of it can't be closed
 +    @Override
 +    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
 +    {
 +        long totalSize = totalSize();
 +
 +        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
 +        if (cfs == null)
 +        {
 +            // schema was dropped during streaming
 +            throw new IOException("CF " + tableId + " was dropped during 
streaming");
 +        }
 +
 +        logger.debug("[Stream #{}] Start receiving file #{} from {}, 
repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.",
 +                     session.planId(), fileSeqNum, session.peer, repairedAt, 
totalSize, cfs.keyspace.getName(),
 +                     cfs.getTableName(), pendingRepair);
 +
 +        StreamDeserializer deserializer = null;
 +        SSTableMultiWriter writer = null;
 +        try (StreamCompressionInputStream streamCompressionInputStream = new 
StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION))
 +        {
 +            TrackedDataInputPlus in = new 
TrackedDataInputPlus(streamCompressionInputStream);
 +            deserializer = new StreamDeserializer(cfs.metadata(), in, 
inputVersion, getHeader(cfs.metadata()));
 +            writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, 
format);
 +            while (in.getBytesRead() < totalSize)
 +            {
 +                writePartition(deserializer, writer);
 +                // TODO move this to BytesReadTracker
 +                session.progress(writer.getFilename(), 
ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
 +            }
 +            logger.debug("[Stream #{}] Finished receiving file #{} from {} 
readBytes = {}, totalSize = {}",
 +                         session.planId(), fileSeqNum, session.peer, 
FBUtilities.prettyPrintMemory(in.getBytesRead()), 
FBUtilities.prettyPrintMemory(totalSize));
 +            return writer;
 +        }
 +        catch (Throwable e)
 +        {
 +            Object partitionKey = deserializer != null ? 
deserializer.partitionKey() : "";
 +            logger.warn("[Stream {}] Error while reading partition {} from 
stream on ks='{}' and table='{}'.",
 +                        session.planId(), partitionKey, 
cfs.keyspace.getName(), cfs.getTableName(), e);
 +            if (writer != null)
 +            {
 +                writer.abort(e);
 +            }
 +            throw Throwables.propagate(e);
 +        }
 +    }
 +
 +    protected SerializationHeader getHeader(TableMetadata metadata)
 +    {
 +        return header != null? header.toHeader(metadata) : null; //pre-3.0 
sstable have no SerializationHeader
 +    }
 +    @SuppressWarnings("resource")
 +    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long 
totalSize, long repairedAt, UUID pendingRepair, SSTableFormat.Type format) 
throws IOException
 +    {
 +        Directories.DataDirectory localDir = 
cfs.getDirectories().getWriteableLocation(totalSize);
 +        if (localDir == null)
 +            throw new IOException(String.format("Insufficient disk space to 
store %s", FBUtilities.prettyPrintMemory(totalSize)));
 +
 +        StreamReceiver streamReceiver = session.getAggregator(tableId);
 +        Preconditions.checkState(streamReceiver instanceof 
CassandraStreamReceiver);
-         LifecycleTransaction txn = 
CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction();
++        LifecycleNewTracker lifecycleNewTracker = 
CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).createLifecycleNewTracker();
 +
-         RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, 
estimatedKeys, repairedAt, pendingRepair, false, format, sstableLevel, 
totalSize, txn, getHeader(cfs.metadata()));
++        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, 
estimatedKeys, repairedAt, pendingRepair, false, format, sstableLevel, 
totalSize, lifecycleNewTracker, getHeader(cfs.metadata()));
 +        return writer;
 +    }
 +
 +    protected long totalSize()
 +    {
 +        long size = 0;
 +        for (SSTableReader.PartitionPositionBounds section : sections)
 +            size += section.upperPosition - section.lowerPosition;
 +        return size;
 +    }
 +
 +    protected void writePartition(StreamDeserializer deserializer, 
SSTableMultiWriter writer) throws IOException
 +    {
 +        writer.append(deserializer.newPartition());
 +        deserializer.checkForExceptions();
 +    }
 +
 +    public static class StreamDeserializer extends 
UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator
 +    {
 +        private final TableMetadata metadata;
 +        private final DataInputPlus in;
 +        private final SerializationHeader header;
 +        private final SerializationHelper helper;
 +
 +        private DecoratedKey key;
 +        private DeletionTime partitionLevelDeletion;
 +        private SSTableSimpleIterator iterator;
 +        private Row staticRow;
 +        private IOException exception;
 +
 +        public StreamDeserializer(TableMetadata metadata, DataInputPlus in, 
Version version, SerializationHeader header) throws IOException
 +        {
 +            this.metadata = metadata;
 +            this.in = in;
 +            this.helper = new SerializationHelper(metadata, 
version.correspondingMessagingVersion(), 
SerializationHelper.Flag.PRESERVE_SIZE);
 +            this.header = header;
 +        }
 +
 +        public StreamDeserializer newPartition() throws IOException
 +        {
 +            key = 
metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
 +            partitionLevelDeletion = DeletionTime.serializer.deserialize(in);
 +            iterator = SSTableSimpleIterator.create(metadata, in, header, 
helper, partitionLevelDeletion);
 +            staticRow = iterator.readStaticRow();
 +            return this;
 +        }
 +
 +        public TableMetadata metadata()
 +        {
 +            return metadata;
 +        }
 +
 +        public RegularAndStaticColumns columns()
 +        {
 +            // We don't know which columns we'll get so assume it can be all 
of them
 +            return metadata.regularAndStaticColumns();
 +        }
 +
 +        public boolean isReverseOrder()
 +        {
 +            return false;
 +        }
 +
 +        public DecoratedKey partitionKey()
 +        {
 +            return key;
 +        }
 +
 +        public DeletionTime partitionLevelDeletion()
 +        {
 +            return partitionLevelDeletion;
 +        }
 +
 +        public Row staticRow()
 +        {
 +            return staticRow;
 +        }
 +
 +        public EncodingStats stats()
 +        {
 +            return header.stats();
 +        }
 +
 +        public boolean hasNext()
 +        {
 +            try
 +            {
 +                return iterator.hasNext();
 +            }
 +            catch (IOError e)
 +            {
 +                if (e.getCause() != null && e.getCause() instanceof 
IOException)
 +                {
 +                    exception = (IOException)e.getCause();
 +                    return false;
 +                }
 +                throw e;
 +            }
 +        }
 +
 +        public Unfiltered next()
 +        {
 +            // Note that in practice we know that IOException will be thrown 
by hasNext(), because that's
 +            // where the actual reading happens, so we don't bother catching 
RuntimeException here (contrarily
 +            // to what we do in hasNext)
 +            Unfiltered unfiltered = iterator.next();
 +            return metadata.isCounter() && unfiltered.kind() == 
Unfiltered.Kind.ROW
 +                   ? maybeMarkLocalToBeCleared((Row) unfiltered)
 +                   : unfiltered;
 +        }
 +
 +        private Row maybeMarkLocalToBeCleared(Row row)
 +        {
 +            return metadata.isCounter() ? row.markCounterLocalToBeCleared() : 
row;
 +        }
 +
 +        public void checkForExceptions() throws IOException
 +        {
 +            if (exception != null)
 +                throw exception;
 +        }
 +
 +        public void close()
 +        {
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
index bb5531e,0000000..b2b2ce5
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
@@@ -1,249 -1,0 +1,280 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.streaming;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.Set;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Iterables;
 +
++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
++import org.apache.cassandra.io.sstable.SSTable;
++import org.apache.cassandra.streaming.StreamReceiveTask;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.view.View;
 +import org.apache.cassandra.dht.Bounds;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.streaming.IncomingStream;
 +import org.apache.cassandra.streaming.StreamReceiver;
 +import org.apache.cassandra.streaming.StreamSession;
 +import org.apache.cassandra.utils.CloseableIterator;
 +import org.apache.cassandra.utils.Throwables;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
 +public class CassandraStreamReceiver implements StreamReceiver
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(CassandraStreamReceiver.class);
 +
 +    private static final int MAX_ROWS_PER_BATCH = 
Integer.getInteger("cassandra.repair.mutation_repair_rows_per_batch", 100);
 +
 +    private final ColumnFamilyStore cfs;
 +    private final StreamSession session;
 +
 +    // Transaction tracking new files received
 +    private final LifecycleTransaction txn;
 +
 +    //  holds references to SSTables received
 +    protected Collection<SSTableReader> sstables;
 +
 +    private final boolean requiresWritePath;
 +
 +
 +    public CassandraStreamReceiver(ColumnFamilyStore cfs, StreamSession 
session, int totalFiles)
 +    {
 +        this.cfs = cfs;
 +        this.session = session;
 +        // this is an "offline" transaction, as we currently manually expose 
the sstables once done;
 +        // this should be revisited at a later date, so that 
LifecycleTransaction manages all sstable state changes
 +        this.txn = LifecycleTransaction.offline(OperationType.STREAM);
 +        this.sstables = new ArrayList<>(totalFiles);
 +        this.requiresWritePath = requiresWritePath(cfs);
 +    }
 +
-     public LifecycleTransaction getTransaction()
-     {
-         return txn;
-     }
- 
 +    public static CassandraStreamReceiver fromReceiver(StreamReceiver 
receiver)
 +    {
 +        Preconditions.checkArgument(receiver instanceof 
CassandraStreamReceiver);
 +        return (CassandraStreamReceiver) receiver;
 +    }
 +
 +    private static CassandraIncomingFile getFile(IncomingStream stream)
 +    {
 +        Preconditions.checkArgument(stream instanceof CassandraIncomingFile, 
"Wrong stream type: {}", stream);
 +        return (CassandraIncomingFile) stream;
 +    }
 +
 +    @Override
 +    @SuppressWarnings("resource")
-     public void received(IncomingStream stream)
++    public synchronized void received(IncomingStream stream)
 +    {
 +        CassandraIncomingFile file = getFile(stream);
 +
 +        Collection<SSTableReader> finished = null;
 +        SSTableMultiWriter sstable = file.getSSTable();
 +        try
 +        {
 +            finished = sstable.finish(true);
 +        }
 +        catch (Throwable t)
 +        {
 +            Throwables.maybeFail(sstable.abort(t));
 +        }
 +        txn.update(finished, false);
 +        sstables.addAll(finished);
 +    }
 +
 +    @Override
 +    public void discardStream(IncomingStream stream)
 +    {
 +        CassandraIncomingFile file = getFile(stream);
 +        Throwables.maybeFail(file.getSSTable().abort(null));
 +    }
 +
++    /**
++     * @return a LifecycleNewTracker whose operations are synchronised on 
this StreamReceiveTask.
++     */
++    public synchronized LifecycleNewTracker createLifecycleNewTracker()
++    {
++        return new LifecycleNewTracker()
++        {
++            @Override
++            public void trackNew(SSTable table)
++            {
++                synchronized (CassandraStreamReceiver.this)
++                {
++                    txn.trackNew(table);
++                }
++            }
++
++            @Override
++            public void untrackNew(SSTable table)
++            {
++                synchronized (CassandraStreamReceiver.this)
++                {
++                    txn.untrackNew(table);
++                }
++            }
++
++            public OperationType opType()
++            {
++                return txn.opType();
++            }
++        };
++    }
++
++
 +    @Override
-     public void abort()
++    public synchronized void abort()
 +    {
 +        sstables.clear();
 +        txn.abort();
 +    }
 +
 +    private boolean hasViews(ColumnFamilyStore cfs)
 +    {
 +        return !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, 
cfs.getTableName()));
 +    }
 +
 +    private boolean hasCDC(ColumnFamilyStore cfs)
 +    {
 +        return cfs.metadata().params.cdc;
 +    }
 +
 +    /*
 +     * We have a special path for views and for CDC.
 +     *
 +     * For views, since the view requires cleaning up any pre-existing state, 
we must put all partitions
 +     * through the same write path as normal mutations. This also ensures any 
2is are also updated.
 +     *
 +     * For CDC-enabled tables, we want to ensure that the mutations are run 
through the CommitLog so they
 +     * can be archived by the CDC process on discard.
 +     */
 +    private boolean requiresWritePath(ColumnFamilyStore cfs) {
 +        return hasCDC(cfs) || (session.streamOperation().requiresViewBuild() 
&& hasViews(cfs));
 +    }
 +
 +    private void sendThroughWritePath(ColumnFamilyStore cfs, 
Collection<SSTableReader> readers) {
 +        boolean hasCdc = hasCDC(cfs);
 +        ColumnFilter filter = ColumnFilter.all(cfs.metadata());
 +        for (SSTableReader reader : readers)
 +        {
 +            Keyspace ks = Keyspace.open(reader.getKeyspaceName());
 +            // When doing mutation-based repair we split each partition into 
smaller batches
 +            // ({@link Stream MAX_ROWS_PER_BATCH}) to avoid OOMing and 
generating heap pressure
 +            try (ISSTableScanner scanner = reader.getScanner();
 +                 CloseableIterator<UnfilteredRowIterator> throttledPartitions 
= ThrottledUnfilteredIterator.throttle(scanner, MAX_ROWS_PER_BATCH))
 +            {
 +                while (throttledPartitions.hasNext())
 +                {
 +                    // MV *can* be applied unsafe if there's no CDC on the 
CFS as we flush
 +                    // before transaction is done.
 +                    //
 +                    // If the CFS has CDC, however, these updates need to be 
written to the CommitLog
 +                    // so they get archived into the cdc_raw folder
 +                    ks.apply(new 
Mutation(PartitionUpdate.fromIterator(throttledPartitions.next(), filter)),
 +                             hasCdc,
 +                             true,
 +                             false);
 +                }
 +            }
 +        }
 +    }
 +
-     private synchronized void finishTransaction()
++    public synchronized  void finishTransaction()
 +    {
 +        txn.finish();
 +    }
 +
 +    @Override
 +    public void finished()
 +    {
 +        boolean requiresWritePath = requiresWritePath(cfs);
 +        Collection<SSTableReader> readers = sstables;
 +
 +        try (Refs<SSTableReader> refs = Refs.ref(readers))
 +        {
 +            if (requiresWritePath)
 +            {
 +                sendThroughWritePath(cfs, readers);
 +            }
 +            else
 +            {
 +                finishTransaction();
 +
 +                // add sstables (this will build secondary indexes too, see 
CASSANDRA-10130)
 +                logger.debug("[Stream #{}] Received {} sstables from {} 
({})", session.planId(), readers.size(), session.peer, readers);
 +                cfs.addSSTables(readers);
 +
 +                //invalidate row and counter cache
 +                if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter())
 +                {
 +                    List<Bounds<Token>> boundsToInvalidate = new 
ArrayList<>(readers.size());
 +                    readers.forEach(sstable -> boundsToInvalidate.add(new 
Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
 +                    Set<Bounds<Token>> nonOverlappingBounds = 
Bounds.getNonOverlappingBounds(boundsToInvalidate);
 +
 +                    if (cfs.isRowCacheEnabled())
 +                    {
 +                        int invalidatedKeys = 
cfs.invalidateRowCache(nonOverlappingBounds);
 +                        if (invalidatedKeys > 0)
 +                            logger.debug("[Stream #{}] Invalidated {} row 
cache entries on table {}.{} after stream " +
 +                                         "receive task completed.", 
session.planId(), invalidatedKeys,
 +                                         cfs.keyspace.getName(), 
cfs.getTableName());
 +                    }
 +
 +                    if (cfs.metadata().isCounter())
 +                    {
 +                        int invalidatedKeys = 
cfs.invalidateCounterCache(nonOverlappingBounds);
 +                        if (invalidatedKeys > 0)
 +                            logger.debug("[Stream #{}] Invalidated {} counter 
cache entries on table {}.{} after stream " +
 +                                         "receive task completed.", 
session.planId(), invalidatedKeys,
 +                                         cfs.keyspace.getName(), 
cfs.getTableName());
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void cleanup()
 +    {
 +        // We don't keep the streamed sstables since we've applied them 
manually so we abort the txn and delete
 +        // the streamed sstables.
 +        if (requiresWritePath)
 +        {
 +            cfs.forceBlockingFlush();
 +            abort();
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index eb5c5fe,76e4dbb..a84f07e
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@@ -21,9 -22,10 +21,9 @@@ import java.util.Collection
  import java.util.Collections;
  import java.util.UUID;
  
 -import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.db.RowIndexEntry;
  import org.apache.cassandra.db.SerializationHeader;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
  import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  import org.apache.cassandra.index.Index;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
@@@ -116,9 -114,9 +116,9 @@@ public class SimpleSSTableMultiWriter i
                                              MetadataCollector 
metadataCollector,
                                              SerializationHeader header,
                                              Collection<Index> indexes,
-                                             LifecycleTransaction txn)
+                                             LifecycleNewTracker 
lifecycleNewTracker)
      {
-         SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, 
repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, 
indexes, txn);
-         return new SimpleSSTableMultiWriter(writer, txn);
 -        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, 
repairedAt, cfm, metadataCollector, header, indexes, lifecycleNewTracker);
++        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, 
repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, 
indexes, lifecycleNewTracker);
+         return new SimpleSSTableMultiWriter(writer, lifecycleNewTracker);
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 29fa573,3358225..ef4deb7
--- 
a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ 
b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@@ -43,18 -42,16 +43,18 @@@ public class RangeAwareSSTableWriter im
      private final int sstableLevel;
      private final long estimatedKeys;
      private final long repairedAt;
 +    private final UUID pendingRepair;
 +    private final boolean isTransient;
      private final SSTableFormat.Type format;
      private final SerializationHeader header;
-     private final LifecycleTransaction txn;
+     private final LifecycleNewTracker lifecycleNewTracker;
      private int currentIndex = -1;
      public final ColumnFamilyStore cfs;
      private final List<SSTableMultiWriter> finishedWriters = new 
ArrayList<>();
      private final List<SSTableReader> finishedReaders = new ArrayList<>();
      private SSTableMultiWriter currentWriter = null;
  
-     public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, 
long repairedAt, UUID pendingRepair, boolean isTransient, SSTableFormat.Type 
format, int sstableLevel, long totalSize, LifecycleTransaction txn, 
SerializationHeader header) throws IOException
 -    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, 
long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, 
LifecycleNewTracker lifecycleNewTracker, SerializationHeader header) throws 
IOException
++    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, 
long repairedAt, UUID pendingRepair, boolean isTransient, SSTableFormat.Type 
format, int sstableLevel, long totalSize, LifecycleNewTracker 
lifecycleNewTracker, SerializationHeader header) throws IOException
      {
          DiskBoundaries db = cfs.getDiskBoundaries();
          directories = db.directories;
@@@ -62,10 -59,8 +62,10 @@@
          this.cfs = cfs;
          this.estimatedKeys = estimatedKeys / directories.size();
          this.repairedAt = repairedAt;
 +        this.pendingRepair = pendingRepair;
 +        this.isTransient = isTransient;
          this.format = format;
-         this.txn = txn;
+         this.lifecycleNewTracker = lifecycleNewTracker;
          this.header = header;
          boundaries = db.positions;
          if (boundaries == null)
@@@ -74,8 -69,8 +74,8 @@@
              if (localDir == null)
                  throw new IOException(String.format("Insufficient disk space 
to store %s",
                                                      
FBUtilities.prettyPrintMemory(totalSize)));
 -            Descriptor desc = 
Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir),
 format));
 -            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, sstableLevel, header, lifecycleNewTracker);
 +            Descriptor desc = 
cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(localDir), 
format);
-             currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, pendingRepair, isTransient, sstableLevel, header, txn);
++            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, pendingRepair, isTransient, sstableLevel, header, 
lifecycleNewTracker);
          }
      }
  
@@@ -96,8 -91,8 +96,8 @@@
              if (currentWriter != null)
                  finishedWriters.add(currentWriter);
  
 -            Descriptor desc = 
Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex))),
 format);
 -            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, sstableLevel, header, lifecycleNewTracker);
 +            Descriptor desc = 
cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex)),
 format);
-             currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, pendingRepair, isTransient, sstableLevel, header, txn);
++            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, 
repairedAt, pendingRepair, isTransient, sstableLevel, header, 
lifecycleNewTracker);
          }
      }
  


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to