Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/33eada06 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/33eada06 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/33eada06 Branch: refs/heads/trunk Commit: 33eada06a6dd3529da644377dba180795f522176 Parents: 4dd7faa 732c43b Author: Benedict Elliott Smith <bened...@apache.org> Authored: Mon Dec 10 15:07:18 2018 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Mon Dec 10 15:07:18 2018 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 8 ++-- .../compaction/AbstractCompactionStrategy.java | 5 ++- .../db/compaction/AbstractStrategyHolder.java | 3 +- .../db/compaction/CompactionStrategyHolder.java | 5 ++- .../compaction/CompactionStrategyManager.java | 5 ++- .../db/compaction/PendingRepairHolder.java | 5 ++- .../db/lifecycle/LifecycleNewTracker.java | 47 ++++++++++++++++++++ .../db/lifecycle/LifecycleTransaction.java | 7 ++- .../apache/cassandra/db/lifecycle/LogFile.java | 24 ++++------ .../cassandra/db/lifecycle/LogTransaction.java | 4 +- .../CassandraEntireSSTableStreamReader.java | 5 ++- .../db/streaming/CassandraStreamReader.java | 5 ++- .../db/streaming/CassandraStreamReceiver.java | 47 ++++++++++++++++---- .../io/sstable/SimpleSSTableMultiWriter.java | 16 +++---- .../sstable/format/RangeAwareSSTableWriter.java | 12 ++--- .../io/sstable/format/SSTableWriter.java | 20 ++++----- .../io/sstable/format/big/BigFormat.java | 6 +-- .../io/sstable/format/big/BigTableWriter.java | 6 +-- .../format/big/BigTableZeroCopyWriter.java | 5 ++- .../locator/AbstractReplicationStrategy.java | 2 +- .../cassandra/streaming/StreamReceiveTask.java | 2 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 5 ++- 23 files changed, 166 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 75b05ea,20da1fa..b8410b8 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -338,7 -2,9 +338,8 @@@ * Make stop-server.bat wait for Cassandra to terminate (CASSANDRA-14829) * Correct sstable sorting for garbagecollect and levelled compaction (CASSANDRA-14870) Merged from 3.0: + * Streaming needs to synchronise access to LifecycleTransaction (CASSANDRA-14554) * Fix cassandra-stress write hang with default options (CASSANDRA-14616) - * Differentiate between slices and RTs when decoding legacy bounds (CASSANDRA-14919) * Netty epoll IOExceptions caused by unclean client disconnects being logged at INFO (CASSANDRA-14909) * Unfiltered.isEmpty conflicts with Row extends AbstractCollection.isEmpty (CASSANDRA-14588) * RangeTombstoneList doesn't properly clean up mergeable or superseded rts in some cases (CASSANDRA-14894) http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index c5149cf,700c1cc..cc6af6f --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -497,15 -507,15 +497,15 @@@ public class ColumnFamilyStore implemen return directories; } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, int sstableLevel, SerializationHeader header, LifecycleTransaction txn) - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) ++ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { - MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel); - return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker); + MetadataCollector collector = new MetadataCollector(metadata().comparator).sstableLevel(sstableLevel); - return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, collector, header, txn); ++ return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, collector, header, lifecycleNewTracker); } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn) - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) ++ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, MetadataCollector metadataCollector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { - return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadataCollector, header, indexManager.listIndexes(), txn); - return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, indexManager.listIndexes(), lifecycleNewTracker); ++ return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadataCollector, header, indexManager.listIndexes(), lifecycleNewTracker); } public boolean supportsEarlyOpen() http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 28ea90a,3d7800d..9d7825c --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@@ -534,9 -583,9 +535,9 @@@ public abstract class AbstractCompactio MetadataCollector meta, SerializationHeader header, Collection<Index> indexes, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { - return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, cfs.metadata, meta, header, indexes, txn); - return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, indexes, lifecycleNewTracker); ++ return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, cfs.metadata, meta, header, indexes, lifecycleNewTracker); } public boolean supportsEarlyOpen() http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java index 24bea06,0000000..63b7909 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java @@@ -1,209 -1,0 +1,210 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.function.Supplier; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SerializationHeader; ++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.schema.CompactionParams; + +/** + * Wrapper that's aware of how sstables are divided between separate strategies, + * and provides a standard interface to them + * + * not threadsafe, calls must be synchronized by caller + */ +public abstract class AbstractStrategyHolder +{ + public static class TaskSupplier implements Comparable<TaskSupplier> + { + private final int numRemaining; + private final Supplier<AbstractCompactionTask> supplier; + + TaskSupplier(int numRemaining, Supplier<AbstractCompactionTask> supplier) + { + this.numRemaining = numRemaining; + this.supplier = supplier; + } + + public AbstractCompactionTask getTask() + { + return supplier.get(); + } + + public int compareTo(TaskSupplier o) + { + return o.numRemaining - numRemaining; + } + } + + public static interface DestinationRouter + { + int getIndexForSSTable(SSTableReader sstable); + int getIndexForSSTableDirectory(Descriptor descriptor); + } + + /** + * Maps sstables to their token partition bucket + */ + static class GroupedSSTableContainer + { + private final AbstractStrategyHolder holder; + private final Set<SSTableReader>[] groups; + + private GroupedSSTableContainer(AbstractStrategyHolder holder) + { + this.holder = holder; + Preconditions.checkArgument(holder.numTokenPartitions > 0, "numTokenPartitions not set"); + groups = new Set[holder.numTokenPartitions]; + } + + void add(SSTableReader sstable) + { + Preconditions.checkArgument(holder.managesSSTable(sstable), "this strategy holder doesn't manage %s", sstable); + int idx = holder.router.getIndexForSSTable(sstable); + Preconditions.checkState(idx >= 0 && idx < holder.numTokenPartitions, "Invalid sstable index (%s) for %s", idx, sstable); + if (groups[idx] == null) + groups[idx] = new HashSet<>(); + groups[idx].add(sstable); + } + + int numGroups() + { + return groups.length; + } + + Set<SSTableReader> getGroup(int i) + { + Preconditions.checkArgument(i >= 0 && i < groups.length); + Set<SSTableReader> group = groups[i]; + return group != null ? group : Collections.emptySet(); + } + + boolean isGroupEmpty(int i) + { + return getGroup(i).isEmpty(); + } + + boolean isEmpty() + { + for (int i = 0; i < groups.length; i++) + if (!isGroupEmpty(i)) + return false; + return true; + } + } + + protected final ColumnFamilyStore cfs; + final DestinationRouter router; + private int numTokenPartitions = -1; + + AbstractStrategyHolder(ColumnFamilyStore cfs, DestinationRouter router) + { + this.cfs = cfs; + this.router = router; + } + + public abstract void startup(); + + public abstract void shutdown(); + + final void setStrategy(CompactionParams params, int numTokenPartitions) + { + Preconditions.checkArgument(numTokenPartitions > 0, "at least one token partition required"); + shutdown(); + this.numTokenPartitions = numTokenPartitions; + setStrategyInternal(params, numTokenPartitions); + } + + protected abstract void setStrategyInternal(CompactionParams params, int numTokenPartitions); + + /** + * SSTables are grouped by their repaired and pending repair status. This method determines if this holder + * holds the sstable for the given repaired/grouped statuses. Holders should be mutually exclusive in the + * groups they deal with. IOW, if one holder returns true for a given isRepaired/isPendingRepair combo, + * none of the others should. + */ + public abstract boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair, boolean isTransient); + + public boolean managesSSTable(SSTableReader sstable) + { + return managesRepairedGroup(sstable.isRepaired(), sstable.isPendingRepair(), sstable.isTransient()); + } + + public abstract AbstractCompactionStrategy getStrategyFor(SSTableReader sstable); + + public abstract Iterable<AbstractCompactionStrategy> allStrategies(); + + public abstract Collection<TaskSupplier> getBackgroundTaskSuppliers(int gcBefore); + + public abstract Collection<AbstractCompactionTask> getMaximalTasks(int gcBefore, boolean splitOutput); + + public abstract Collection<AbstractCompactionTask> getUserDefinedTasks(GroupedSSTableContainer sstables, int gcBefore); + + public GroupedSSTableContainer createGroupedSSTableContainer() + { + return new GroupedSSTableContainer(this); + } + + public abstract void addSSTables(GroupedSSTableContainer sstables); + + public abstract void removeSSTables(GroupedSSTableContainer sstables); + + public abstract void replaceSSTables(GroupedSSTableContainer removed, GroupedSSTableContainer added); + + public abstract List<ISSTableScanner> getScanners(GroupedSSTableContainer sstables, Collection<Range<Token>> ranges); + + + public abstract SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, + long keyCount, + long repairedAt, + UUID pendingRepair, + boolean isTransient, + MetadataCollector collector, + SerializationHeader header, + Collection<Index> indexes, - LifecycleTransaction txn); ++ LifecycleNewTracker lifecycleNewTracker); + + /** + * Return the directory index the given compaction strategy belongs to, or -1 + * if it's not held by this holder + */ + public abstract int getStrategyIndex(AbstractCompactionStrategy strategy); + + public abstract boolean containsSSTable(SSTableReader sstable); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java index 8ce93fa,0000000..129ee79 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java @@@ -1,264 -1,0 +1,265 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SerializationHeader; ++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.schema.CompactionParams; +import org.apache.cassandra.service.ActiveRepairService; + +public class CompactionStrategyHolder extends AbstractStrategyHolder +{ + private final List<AbstractCompactionStrategy> strategies = new ArrayList<>(); + private final boolean isRepaired; + + public CompactionStrategyHolder(ColumnFamilyStore cfs, DestinationRouter router, boolean isRepaired) + { + super(cfs, router); + this.isRepaired = isRepaired; + } + + @Override + public void startup() + { + strategies.forEach(AbstractCompactionStrategy::startup); + } + + @Override + public void shutdown() + { + strategies.forEach(AbstractCompactionStrategy::shutdown); + } + + @Override + public void setStrategyInternal(CompactionParams params, int numTokenPartitions) + { + strategies.clear(); + for (int i = 0; i < numTokenPartitions; i++) + strategies.add(cfs.createCompactionStrategyInstance(params)); + } + + @Override + public boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair, boolean isTransient) + { + if (!isPendingRepair) + { + Preconditions.checkArgument(!isTransient, "isTransient can only be true for sstables pending repairs"); + return this.isRepaired == isRepaired; + } + else + { + Preconditions.checkArgument(!isRepaired, "SSTables cannot be both repaired and pending repair"); + return false; + + } + } + + @Override + public AbstractCompactionStrategy getStrategyFor(SSTableReader sstable) + { + Preconditions.checkArgument(managesSSTable(sstable), "Attempting to get compaction strategy from wrong holder"); + return strategies.get(router.getIndexForSSTable(sstable)); + } + + @Override + public Iterable<AbstractCompactionStrategy> allStrategies() + { + return strategies; + } + + @Override + public Collection<TaskSupplier> getBackgroundTaskSuppliers(int gcBefore) + { + List<TaskSupplier> suppliers = new ArrayList<>(strategies.size()); + for (AbstractCompactionStrategy strategy : strategies) + suppliers.add(new TaskSupplier(strategy.getEstimatedRemainingTasks(), () -> strategy.getNextBackgroundTask(gcBefore))); + + return suppliers; + } + + @Override + public Collection<AbstractCompactionTask> getMaximalTasks(int gcBefore, boolean splitOutput) + { + List<AbstractCompactionTask> tasks = new ArrayList<>(strategies.size()); + for (AbstractCompactionStrategy strategy : strategies) + { + Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput); + if (task != null) + tasks.addAll(task); + } + return tasks; + } + + @Override + public Collection<AbstractCompactionTask> getUserDefinedTasks(GroupedSSTableContainer sstables, int gcBefore) + { + List<AbstractCompactionTask> tasks = new ArrayList<>(strategies.size()); + for (int i = 0; i < strategies.size(); i++) + { + if (sstables.isGroupEmpty(i)) + continue; + + tasks.add(strategies.get(i).getUserDefinedTask(sstables.getGroup(i), gcBefore)); + } + return tasks; + } + + @Override + public void addSSTables(GroupedSSTableContainer sstables) + { + Preconditions.checkArgument(sstables.numGroups() == strategies.size()); + for (int i = 0; i < strategies.size(); i++) + { + if (!sstables.isGroupEmpty(i)) + strategies.get(i).addSSTables(sstables.getGroup(i)); + } + } + + @Override + public void removeSSTables(GroupedSSTableContainer sstables) + { + Preconditions.checkArgument(sstables.numGroups() == strategies.size()); + for (int i = 0; i < strategies.size(); i++) + { + if (!sstables.isGroupEmpty(i)) + strategies.get(i).removeSSTables(sstables.getGroup(i)); + } + } + + @Override + public void replaceSSTables(GroupedSSTableContainer removed, GroupedSSTableContainer added) + { + Preconditions.checkArgument(removed.numGroups() == strategies.size()); + Preconditions.checkArgument(added.numGroups() == strategies.size()); + for (int i = 0; i < strategies.size(); i++) + { + if (removed.isGroupEmpty(i) && added.isGroupEmpty(i)) + continue; + + if (removed.isGroupEmpty(i)) + strategies.get(i).addSSTables(added.getGroup(i)); + else + strategies.get(i).replaceSSTables(removed.getGroup(i), added.getGroup(i)); + } + } + + public AbstractCompactionStrategy first() + { + return strategies.get(0); + } + + @Override + @SuppressWarnings("resource") + public List<ISSTableScanner> getScanners(GroupedSSTableContainer sstables, Collection<Range<Token>> ranges) + { + List<ISSTableScanner> scanners = new ArrayList<>(strategies.size()); + for (int i = 0; i < strategies.size(); i++) + { + if (sstables.isGroupEmpty(i)) + continue; + + scanners.addAll(strategies.get(i).getScanners(sstables.getGroup(i), ranges).scanners); + } + return scanners; + } + + Collection<Collection<SSTableReader>> groupForAnticompaction(Iterable<SSTableReader> sstables) + { + Preconditions.checkState(!isRepaired); + GroupedSSTableContainer group = createGroupedSSTableContainer(); + sstables.forEach(group::add); + + Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>(); + for (int i = 0; i < strategies.size(); i++) + { + if (group.isGroupEmpty(i)) + continue; + + anticompactionGroups.addAll(strategies.get(i).groupSSTablesForAntiCompaction(group.getGroup(i))); + } + + return anticompactionGroups; + } + + @Override + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, + long keyCount, + long repairedAt, + UUID pendingRepair, + boolean isTransient, + MetadataCollector collector, + SerializationHeader header, + Collection<Index> indexes, - LifecycleTransaction txn) ++ LifecycleNewTracker lifecycleNewTracker) + { + if (isRepaired) + { + Preconditions.checkArgument(repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE, + "Repaired CompactionStrategyHolder can't create unrepaired sstable writers"); + } + else + { + Preconditions.checkArgument(repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE, + "Unrepaired CompactionStrategyHolder can't create repaired sstable writers"); + } + Preconditions.checkArgument(pendingRepair == null, + "CompactionStrategyHolder can't create sstable writer with pendingRepair id"); + // to avoid creating a compaction strategy for the wrong pending repair manager, we get the index based on where the sstable is to be written + AbstractCompactionStrategy strategy = strategies.get(router.getIndexForSSTableDirectory(descriptor)); + return strategy.createSSTableMultiWriter(descriptor, + keyCount, + repairedAt, + pendingRepair, + isTransient, + collector, + header, + indexes, - txn); ++ lifecycleNewTracker); + } + + @Override + public int getStrategyIndex(AbstractCompactionStrategy strategy) + { + return strategies.indexOf(strategy); + } + + @Override + public boolean containsSSTable(SSTableReader sstable) + { + return Iterables.any(strategies, acs -> acs.getSSTables().contains(sstable)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 45ccbe2,86170a1..b978641 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@@ -33,12 -23,13 +33,13 @@@ import java.util.concurrent.Callable import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.primitives.Ints; - +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; + import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -1117,22 -1016,20 +1118,22 @@@ public class CompactionStrategyManager MetadataCollector collector, SerializationHeader header, Collection<Index> indexes, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { + SSTable.validateRepairedMetadata(repairedAt, pendingRepair, isTransient); maybeReloadDiskBoundaries(); readLock.lock(); try { - if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) - { - return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker); - } - else - { - return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker); - } + return getHolder(repairedAt, pendingRepair, isTransient).createSSTableMultiWriter(descriptor, + keyCount, + repairedAt, + pendingRepair, + isTransient, + collector, + header, + indexes, - txn); ++ lifecycleNewTracker); } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java index 92e44a7,0000000..03d4111 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java @@@ -1,284 -1,0 +1,285 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SerializationHeader; ++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.schema.CompactionParams; +import org.apache.cassandra.service.ActiveRepairService; + +public class PendingRepairHolder extends AbstractStrategyHolder +{ + private final List<PendingRepairManager> managers = new ArrayList<>(); + private final boolean isTransient; + + public PendingRepairHolder(ColumnFamilyStore cfs, DestinationRouter router, boolean isTransient) + { + super(cfs, router); + this.isTransient = isTransient; + } + + @Override + public void startup() + { + managers.forEach(PendingRepairManager::startup); + } + + @Override + public void shutdown() + { + managers.forEach(PendingRepairManager::shutdown); + } + + @Override + public void setStrategyInternal(CompactionParams params, int numTokenPartitions) + { + managers.clear(); + for (int i = 0; i < numTokenPartitions; i++) + managers.add(new PendingRepairManager(cfs, params, isTransient)); + } + + @Override + public boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair, boolean isTransient) + { + Preconditions.checkArgument(!isPendingRepair || !isRepaired, + "SSTables cannot be both repaired and pending repair"); + return isPendingRepair && (this.isTransient == isTransient); + } + + @Override + public AbstractCompactionStrategy getStrategyFor(SSTableReader sstable) + { + Preconditions.checkArgument(managesSSTable(sstable), "Attempting to get compaction strategy from wrong holder"); + return managers.get(router.getIndexForSSTable(sstable)).getOrCreate(sstable); + } + + @Override + public Iterable<AbstractCompactionStrategy> allStrategies() + { + return Iterables.concat(Iterables.transform(managers, PendingRepairManager::getStrategies)); + } + + Iterable<AbstractCompactionStrategy> getStrategiesFor(UUID session) + { + List<AbstractCompactionStrategy> strategies = new ArrayList<>(managers.size()); + for (PendingRepairManager manager : managers) + { + AbstractCompactionStrategy strategy = manager.get(session); + if (strategy != null) + strategies.add(strategy); + } + return strategies; + } + + public Iterable<PendingRepairManager> getManagers() + { + return managers; + } + + @Override + public Collection<TaskSupplier> getBackgroundTaskSuppliers(int gcBefore) + { + List<TaskSupplier> suppliers = new ArrayList<>(managers.size()); + for (PendingRepairManager manager : managers) + suppliers.add(new TaskSupplier(manager.getMaxEstimatedRemainingTasks(), () -> manager.getNextBackgroundTask(gcBefore))); + + return suppliers; + } + + @Override + public Collection<AbstractCompactionTask> getMaximalTasks(int gcBefore, boolean splitOutput) + { + List<AbstractCompactionTask> tasks = new ArrayList<>(managers.size()); + for (PendingRepairManager manager : managers) + { + Collection<AbstractCompactionTask> task = manager.getMaximalTasks(gcBefore, splitOutput); + if (task != null) + tasks.addAll(task); + } + return tasks; + } + + @Override + public Collection<AbstractCompactionTask> getUserDefinedTasks(GroupedSSTableContainer sstables, int gcBefore) + { + List<AbstractCompactionTask> tasks = new ArrayList<>(managers.size()); + + for (int i = 0; i < managers.size(); i++) + { + if (sstables.isGroupEmpty(i)) + continue; + + tasks.addAll(managers.get(i).createUserDefinedTasks(sstables.getGroup(i), gcBefore)); + } + return tasks; + } + + AbstractCompactionTask getNextRepairFinishedTask() + { + List<TaskSupplier> repairFinishedSuppliers = getRepairFinishedTaskSuppliers(); + if (!repairFinishedSuppliers.isEmpty()) + { + Collections.sort(repairFinishedSuppliers); + for (TaskSupplier supplier : repairFinishedSuppliers) + { + AbstractCompactionTask task = supplier.getTask(); + if (task != null) + return task; + } + } + return null; + } + + private ArrayList<TaskSupplier> getRepairFinishedTaskSuppliers() + { + ArrayList<TaskSupplier> suppliers = new ArrayList<>(managers.size()); + for (PendingRepairManager manager : managers) + { + int numPending = manager.getNumPendingRepairFinishedTasks(); + if (numPending > 0) + { + suppliers.add(new TaskSupplier(numPending, manager::getNextRepairFinishedTask)); + } + } + + return suppliers; + } + + @Override + public void addSSTables(GroupedSSTableContainer sstables) + { + Preconditions.checkArgument(sstables.numGroups() == managers.size()); + for (int i = 0; i < managers.size(); i++) + { + if (!sstables.isGroupEmpty(i)) + managers.get(i).addSSTables(sstables.getGroup(i)); + } + } + + @Override + public void removeSSTables(GroupedSSTableContainer sstables) + { + Preconditions.checkArgument(sstables.numGroups() == managers.size()); + for (int i = 0; i < managers.size(); i++) + { + if (!sstables.isGroupEmpty(i)) + managers.get(i).removeSSTables(sstables.getGroup(i)); + } + } + + @Override + public void replaceSSTables(GroupedSSTableContainer removed, GroupedSSTableContainer added) + { + Preconditions.checkArgument(removed.numGroups() == managers.size()); + Preconditions.checkArgument(added.numGroups() == managers.size()); + for (int i = 0; i < managers.size(); i++) + { + if (removed.isGroupEmpty(i) && added.isGroupEmpty(i)) + continue; + + if (removed.isGroupEmpty(i)) + managers.get(i).addSSTables(added.getGroup(i)); + else + managers.get(i).replaceSSTables(removed.getGroup(i), added.getGroup(i)); + } + } + + @Override + public List<ISSTableScanner> getScanners(GroupedSSTableContainer sstables, Collection<Range<Token>> ranges) + { + List<ISSTableScanner> scanners = new ArrayList<>(managers.size()); + for (int i = 0; i < managers.size(); i++) + { + if (sstables.isGroupEmpty(i)) + continue; + + scanners.addAll(managers.get(i).getScanners(sstables.getGroup(i), ranges)); + } + return scanners; + } + + @Override + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, + long keyCount, + long repairedAt, + UUID pendingRepair, + boolean isTransient, + MetadataCollector collector, + SerializationHeader header, + Collection<Index> indexes, - LifecycleTransaction txn) ++ LifecycleNewTracker lifecycleNewTracker) + { + Preconditions.checkArgument(repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE, + "PendingRepairHolder can't create sstablewriter with repaired at set"); + Preconditions.checkArgument(pendingRepair != null, + "PendingRepairHolder can't create sstable writer without pendingRepair id"); + // to avoid creating a compaction strategy for the wrong pending repair manager, we get the index based on where the sstable is to be written + AbstractCompactionStrategy strategy = managers.get(router.getIndexForSSTableDirectory(descriptor)).getOrCreate(pendingRepair); + return strategy.createSSTableMultiWriter(descriptor, + keyCount, + repairedAt, + pendingRepair, + isTransient, + collector, + header, + indexes, - txn); ++ lifecycleNewTracker); + } + + @Override + public int getStrategyIndex(AbstractCompactionStrategy strategy) + { + for (int i = 0; i < managers.size(); i++) + { + if (managers.get(i).hasStrategy(strategy)) + return i; + } + return -1; + } + + public boolean hasDataForSession(UUID sessionID) + { + return Iterables.any(managers, prm -> prm.hasDataForSession(sessionID)); + } + + @Override + public boolean containsSSTable(SSTableReader sstable) + { + return Iterables.any(managers, prm -> prm.containsSSTable(sstable)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/lifecycle/LogFile.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java index a2c1c32,12531cd..64d6af0 --- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java @@@ -420,7 -420,7 +420,7 @@@ class LogTransaction extends Transactio * for further details on transaction logs. * * This method is called on startup and by the standalone sstableutil tool when the cleanup option is specified, -- * @see StandaloneSSTableUtil. ++ * @see org.apache.cassandra.tools.StandaloneSSTableUtil * * @return true if the leftovers of all transaction logs found were removed, false otherwise. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java index 6f8f06a,0000000..479ee71 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java @@@ -1,177 -1,0 +1,178 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import com.google.common.base.Throwables; ++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; + +import static java.lang.String.format; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +/** + * CassandraEntireSSTableStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraEntireSSTableStreamReader implements IStreamReader +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraEntireSSTableStreamReader.class); + + private final TableId tableId; + private final StreamSession session; + private final CassandraStreamHeader header; + private final int fileSequenceNumber; + + public CassandraEntireSSTableStreamReader(StreamMessageHeader messageHeader, CassandraStreamHeader streamHeader, StreamSession session) + { + if (streamHeader.format != SSTableFormat.Type.BIG) + throw new AssertionError("Unsupported SSTable format " + streamHeader.format); + + if (session.getPendingRepair() != null) + { + // we should only ever be streaming pending repair sstables if the session has a pending repair id + if (!session.getPendingRepair().equals(messageHeader.pendingRepair)) + throw new IllegalStateException(format("Stream Session & SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId)); + } + + this.header = streamHeader; + this.session = session; + this.tableId = messageHeader.tableId; + this.fileSequenceNumber = messageHeader.sequenceNumber; + } + + /** + * @param in where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ + @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed + @Override + public SSTableMultiWriter read(DataInputPlus in) throws IOException + { + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + if (cfs == null) + { + // schema was dropped during streaming + throw new IOException("Table " + tableId + " was dropped during streaming"); + } + + ComponentManifest manifest = header.componentManifest; + long totalSize = manifest.totalSize(); + + logger.debug("[Stream #{}] Started receiving sstable #{} from {}, size = {}, table = {}", + session.planId(), + fileSequenceNumber, + session.peer, + prettyPrintMemory(totalSize), + cfs.metadata()); + + BigTableZeroCopyWriter writer = null; + + try + { + writer = createWriter(cfs, totalSize, manifest.components()); + long bytesRead = 0; + for (Component component : manifest.components()) + { + long length = manifest.sizeOf(component); + + logger.debug("[Stream #{}] Started receiving {} component from {}, componentSize = {}, readBytes = {}, totalSize = {}", + session.planId(), + component, + session.peer, + prettyPrintMemory(length), + prettyPrintMemory(bytesRead), + prettyPrintMemory(totalSize)); + + writer.writeComponent(component.type, in, length); + session.progress(writer.descriptor.filenameFor(component), ProgressInfo.Direction.IN, length, length); + bytesRead += length; + + logger.debug("[Stream #{}] Finished receiving {} component from {}, componentSize = {}, readBytes = {}, totalSize = {}", + session.planId(), + component, + session.peer, + prettyPrintMemory(length), + prettyPrintMemory(bytesRead), + prettyPrintMemory(totalSize)); + } + + writer.descriptor.getMetadataSerializer().mutateLevel(writer.descriptor, header.sstableLevel); + return writer; + } + catch (Throwable e) + { + logger.error("[Stream {}] Error while reading sstable from stream for table = {}", session.planId(), cfs.metadata(), e); + if (writer != null) + e = writer.abort(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + + private File getDataDir(ColumnFamilyStore cfs, long totalSize) throws IOException + { + Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); + if (localDir == null) + throw new IOException(format("Insufficient disk space to store %s", prettyPrintMemory(totalSize))); + + File dir = cfs.getDirectories().getLocationForDisk(cfs.getDiskBoundaries().getCorrectDiskForKey(header.firstKey)); + + if (dir == null) + return cfs.getDirectories().getDirectoryForNewSSTables(); + + return dir; + } + + @SuppressWarnings("resource") + protected BigTableZeroCopyWriter createWriter(ColumnFamilyStore cfs, long totalSize, Collection<Component> components) throws IOException + { + File dataDir = getDataDir(cfs, totalSize); + + StreamReceiver streamReceiver = session.getAggregator(tableId); + assert streamReceiver instanceof CassandraStreamReceiver; + - LifecycleTransaction txn = CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction(); ++ LifecycleNewTracker lifecycleNewTracker = CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).createLifecycleNewTracker(); + + Descriptor desc = cfs.newSSTableDescriptor(dataDir, header.version, header.format); + + logger.debug("[Table #{}] {} Components to write: {}", cfs.metadata(), desc.filenameFor(Component.DATA), components); + - return new BigTableZeroCopyWriter(desc, cfs.metadata, txn, components); ++ return new BigTableZeroCopyWriter(desc, cfs.metadata, lifecycleNewTracker, components); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java index 572c648,0000000..43371a9 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java @@@ -1,286 -1,0 +1,287 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.streaming; + +import java.io.*; +import java.util.Collection; +import java.util.UUID; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.UnmodifiableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.TrackedDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.db.*; ++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.SSTableSimpleIterator; +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.compress.StreamCompressionInputStream; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.streaming.messages.StreamMessage; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraStreamReader reads from stream and writes to SSTable. + */ +public class CassandraStreamReader implements IStreamReader +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraStreamReader.class); + protected final TableId tableId; + protected final long estimatedKeys; + protected final Collection<SSTableReader.PartitionPositionBounds> sections; + protected final StreamSession session; + protected final Version inputVersion; + protected final long repairedAt; + protected final UUID pendingRepair; + protected final SSTableFormat.Type format; + protected final int sstableLevel; + protected final SerializationHeader.Component header; + protected final int fileSeqNum; + + public CassandraStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) + { + if (session.getPendingRepair() != null) + { + // we should only ever be streaming pending repair + // sstables if the session has a pending repair id + assert session.getPendingRepair().equals(header.pendingRepair); + } + this.session = session; + this.tableId = header.tableId; + this.estimatedKeys = streamHeader.estimatedKeys; + this.sections = streamHeader.sections; + this.inputVersion = streamHeader.version; + this.repairedAt = header.repairedAt; + this.pendingRepair = header.pendingRepair; + this.format = streamHeader.format; + this.sstableLevel = streamHeader.sstableLevel; + this.header = streamHeader.serializationHeader; + this.fileSeqNum = header.sequenceNumber; + } + + /** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ + @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed + @Override + public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException + { + long totalSize = totalSize(); + + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + if (cfs == null) + { + // schema was dropped during streaming + throw new IOException("CF " + tableId + " was dropped during streaming"); + } + + logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.", + session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), + cfs.getTableName(), pendingRepair); + + StreamDeserializer deserializer = null; + SSTableMultiWriter writer = null; + try (StreamCompressionInputStream streamCompressionInputStream = new StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION)) + { + TrackedDataInputPlus in = new TrackedDataInputPlus(streamCompressionInputStream); + deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); + writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); + while (in.getBytesRead() < totalSize) + { + writePartition(deserializer, writer); + // TODO move this to BytesReadTracker + session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); + } + logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", + session.planId(), fileSeqNum, session.peer, FBUtilities.prettyPrintMemory(in.getBytesRead()), FBUtilities.prettyPrintMemory(totalSize)); + return writer; + } + catch (Throwable e) + { + Object partitionKey = deserializer != null ? deserializer.partitionKey() : ""; + logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", + session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName(), e); + if (writer != null) + { + writer.abort(e); + } + throw Throwables.propagate(e); + } + } + + protected SerializationHeader getHeader(TableMetadata metadata) + { + return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader + } + @SuppressWarnings("resource") + protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, UUID pendingRepair, SSTableFormat.Type format) throws IOException + { + Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); + if (localDir == null) + throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); + + StreamReceiver streamReceiver = session.getAggregator(tableId); + Preconditions.checkState(streamReceiver instanceof CassandraStreamReceiver); - LifecycleTransaction txn = CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction(); ++ LifecycleNewTracker lifecycleNewTracker = CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).createLifecycleNewTracker(); + - RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, false, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata())); ++ RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, false, format, sstableLevel, totalSize, lifecycleNewTracker, getHeader(cfs.metadata())); + return writer; + } + + protected long totalSize() + { + long size = 0; + for (SSTableReader.PartitionPositionBounds section : sections) + size += section.upperPosition - section.lowerPosition; + return size; + } + + protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer) throws IOException + { + writer.append(deserializer.newPartition()); + deserializer.checkForExceptions(); + } + + public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator + { + private final TableMetadata metadata; + private final DataInputPlus in; + private final SerializationHeader header; + private final SerializationHelper helper; + + private DecoratedKey key; + private DeletionTime partitionLevelDeletion; + private SSTableSimpleIterator iterator; + private Row staticRow; + private IOException exception; + + public StreamDeserializer(TableMetadata metadata, DataInputPlus in, Version version, SerializationHeader header) throws IOException + { + this.metadata = metadata; + this.in = in; + this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE); + this.header = header; + } + + public StreamDeserializer newPartition() throws IOException + { + key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in)); + partitionLevelDeletion = DeletionTime.serializer.deserialize(in); + iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion); + staticRow = iterator.readStaticRow(); + return this; + } + + public TableMetadata metadata() + { + return metadata; + } + + public RegularAndStaticColumns columns() + { + // We don't know which columns we'll get so assume it can be all of them + return metadata.regularAndStaticColumns(); + } + + public boolean isReverseOrder() + { + return false; + } + + public DecoratedKey partitionKey() + { + return key; + } + + public DeletionTime partitionLevelDeletion() + { + return partitionLevelDeletion; + } + + public Row staticRow() + { + return staticRow; + } + + public EncodingStats stats() + { + return header.stats(); + } + + public boolean hasNext() + { + try + { + return iterator.hasNext(); + } + catch (IOError e) + { + if (e.getCause() != null && e.getCause() instanceof IOException) + { + exception = (IOException)e.getCause(); + return false; + } + throw e; + } + } + + public Unfiltered next() + { + // Note that in practice we know that IOException will be thrown by hasNext(), because that's + // where the actual reading happens, so we don't bother catching RuntimeException here (contrarily + // to what we do in hasNext) + Unfiltered unfiltered = iterator.next(); + return metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW + ? maybeMarkLocalToBeCleared((Row) unfiltered) + : unfiltered; + } + + private Row maybeMarkLocalToBeCleared(Row row) + { + return metadata.isCounter() ? row.markCounterLocalToBeCleared() : row; + } + + public void checkForExceptions() throws IOException + { + if (exception != null) + throw exception; + } + + public void close() + { + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java index bb5531e,0000000..b2b2ce5 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java @@@ -1,249 -1,0 +1,280 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + ++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; ++import org.apache.cassandra.io.sstable.SSTable; ++import org.apache.cassandra.streaming.StreamReceiveTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.view.View; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.streaming.IncomingStream; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.concurrent.Refs; + +public class CassandraStreamReceiver implements StreamReceiver +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraStreamReceiver.class); + + private static final int MAX_ROWS_PER_BATCH = Integer.getInteger("cassandra.repair.mutation_repair_rows_per_batch", 100); + + private final ColumnFamilyStore cfs; + private final StreamSession session; + + // Transaction tracking new files received + private final LifecycleTransaction txn; + + // holds references to SSTables received + protected Collection<SSTableReader> sstables; + + private final boolean requiresWritePath; + + + public CassandraStreamReceiver(ColumnFamilyStore cfs, StreamSession session, int totalFiles) + { + this.cfs = cfs; + this.session = session; + // this is an "offline" transaction, as we currently manually expose the sstables once done; + // this should be revisited at a later date, so that LifecycleTransaction manages all sstable state changes + this.txn = LifecycleTransaction.offline(OperationType.STREAM); + this.sstables = new ArrayList<>(totalFiles); + this.requiresWritePath = requiresWritePath(cfs); + } + - public LifecycleTransaction getTransaction() - { - return txn; - } - + public static CassandraStreamReceiver fromReceiver(StreamReceiver receiver) + { + Preconditions.checkArgument(receiver instanceof CassandraStreamReceiver); + return (CassandraStreamReceiver) receiver; + } + + private static CassandraIncomingFile getFile(IncomingStream stream) + { + Preconditions.checkArgument(stream instanceof CassandraIncomingFile, "Wrong stream type: {}", stream); + return (CassandraIncomingFile) stream; + } + + @Override + @SuppressWarnings("resource") - public void received(IncomingStream stream) ++ public synchronized void received(IncomingStream stream) + { + CassandraIncomingFile file = getFile(stream); + + Collection<SSTableReader> finished = null; + SSTableMultiWriter sstable = file.getSSTable(); + try + { + finished = sstable.finish(true); + } + catch (Throwable t) + { + Throwables.maybeFail(sstable.abort(t)); + } + txn.update(finished, false); + sstables.addAll(finished); + } + + @Override + public void discardStream(IncomingStream stream) + { + CassandraIncomingFile file = getFile(stream); + Throwables.maybeFail(file.getSSTable().abort(null)); + } + ++ /** ++ * @return a LifecycleNewTracker whose operations are synchronised on this StreamReceiveTask. ++ */ ++ public synchronized LifecycleNewTracker createLifecycleNewTracker() ++ { ++ return new LifecycleNewTracker() ++ { ++ @Override ++ public void trackNew(SSTable table) ++ { ++ synchronized (CassandraStreamReceiver.this) ++ { ++ txn.trackNew(table); ++ } ++ } ++ ++ @Override ++ public void untrackNew(SSTable table) ++ { ++ synchronized (CassandraStreamReceiver.this) ++ { ++ txn.untrackNew(table); ++ } ++ } ++ ++ public OperationType opType() ++ { ++ return txn.opType(); ++ } ++ }; ++ } ++ ++ + @Override - public void abort() ++ public synchronized void abort() + { + sstables.clear(); + txn.abort(); + } + + private boolean hasViews(ColumnFamilyStore cfs) + { + return !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, cfs.getTableName())); + } + + private boolean hasCDC(ColumnFamilyStore cfs) + { + return cfs.metadata().params.cdc; + } + + /* + * We have a special path for views and for CDC. + * + * For views, since the view requires cleaning up any pre-existing state, we must put all partitions + * through the same write path as normal mutations. This also ensures any 2is are also updated. + * + * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they + * can be archived by the CDC process on discard. + */ + private boolean requiresWritePath(ColumnFamilyStore cfs) { + return hasCDC(cfs) || (session.streamOperation().requiresViewBuild() && hasViews(cfs)); + } + + private void sendThroughWritePath(ColumnFamilyStore cfs, Collection<SSTableReader> readers) { + boolean hasCdc = hasCDC(cfs); + ColumnFilter filter = ColumnFilter.all(cfs.metadata()); + for (SSTableReader reader : readers) + { + Keyspace ks = Keyspace.open(reader.getKeyspaceName()); + // When doing mutation-based repair we split each partition into smaller batches + // ({@link Stream MAX_ROWS_PER_BATCH}) to avoid OOMing and generating heap pressure + try (ISSTableScanner scanner = reader.getScanner(); + CloseableIterator<UnfilteredRowIterator> throttledPartitions = ThrottledUnfilteredIterator.throttle(scanner, MAX_ROWS_PER_BATCH)) + { + while (throttledPartitions.hasNext()) + { + // MV *can* be applied unsafe if there's no CDC on the CFS as we flush + // before transaction is done. + // + // If the CFS has CDC, however, these updates need to be written to the CommitLog + // so they get archived into the cdc_raw folder + ks.apply(new Mutation(PartitionUpdate.fromIterator(throttledPartitions.next(), filter)), + hasCdc, + true, + false); + } + } + } + } + - private synchronized void finishTransaction() ++ public synchronized void finishTransaction() + { + txn.finish(); + } + + @Override + public void finished() + { + boolean requiresWritePath = requiresWritePath(cfs); + Collection<SSTableReader> readers = sstables; + + try (Refs<SSTableReader> refs = Refs.ref(readers)) + { + if (requiresWritePath) + { + sendThroughWritePath(cfs, readers); + } + else + { + finishTransaction(); + + // add sstables (this will build secondary indexes too, see CASSANDRA-10130) + logger.debug("[Stream #{}] Received {} sstables from {} ({})", session.planId(), readers.size(), session.peer, readers); + cfs.addSSTables(readers); + + //invalidate row and counter cache + if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter()) + { + List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size()); + readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()))); + Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate); + + if (cfs.isRowCacheEnabled()) + { + int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds); + if (invalidatedKeys > 0) + logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " + + "receive task completed.", session.planId(), invalidatedKeys, + cfs.keyspace.getName(), cfs.getTableName()); + } + + if (cfs.metadata().isCounter()) + { + int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); + if (invalidatedKeys > 0) + logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " + + "receive task completed.", session.planId(), invalidatedKeys, + cfs.keyspace.getName(), cfs.getTableName()); + } + } + } + } + } + + @Override + public void cleanup() + { + // We don't keep the streamed sstables since we've applied them manually so we abort the txn and delete + // the streamed sstables. + if (requiresWritePath) + { + cfs.forceBlockingFlush(); + abort(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java index eb5c5fe,76e4dbb..a84f07e --- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java @@@ -21,9 -22,10 +21,9 @@@ import java.util.Collection import java.util.Collections; import java.util.UUID; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; - import org.apache.cassandra.db.lifecycle.LifecycleTransaction; + import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.format.SSTableReader; @@@ -116,9 -114,9 +116,9 @@@ public class SimpleSSTableMultiWriter i MetadataCollector metadataCollector, SerializationHeader header, Collection<Index> indexes, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { - SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, indexes, txn); - return new SimpleSSTableMultiWriter(writer, txn); - SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, lifecycleNewTracker); ++ SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, indexes, lifecycleNewTracker); + return new SimpleSSTableMultiWriter(writer, lifecycleNewTracker); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java index 29fa573,3358225..ef4deb7 --- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java @@@ -43,18 -42,16 +43,18 @@@ public class RangeAwareSSTableWriter im private final int sstableLevel; private final long estimatedKeys; private final long repairedAt; + private final UUID pendingRepair; + private final boolean isTransient; private final SSTableFormat.Type format; private final SerializationHeader header; - private final LifecycleTransaction txn; + private final LifecycleNewTracker lifecycleNewTracker; private int currentIndex = -1; public final ColumnFamilyStore cfs; private final List<SSTableMultiWriter> finishedWriters = new ArrayList<>(); private final List<SSTableReader> finishedReaders = new ArrayList<>(); private SSTableMultiWriter currentWriter = null; - public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, UUID pendingRepair, boolean isTransient, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException - public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleNewTracker lifecycleNewTracker, SerializationHeader header) throws IOException ++ public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, UUID pendingRepair, boolean isTransient, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleNewTracker lifecycleNewTracker, SerializationHeader header) throws IOException { DiskBoundaries db = cfs.getDiskBoundaries(); directories = db.directories; @@@ -62,10 -59,8 +62,10 @@@ this.cfs = cfs; this.estimatedKeys = estimatedKeys / directories.size(); this.repairedAt = repairedAt; + this.pendingRepair = pendingRepair; + this.isTransient = isTransient; this.format = format; - this.txn = txn; + this.lifecycleNewTracker = lifecycleNewTracker; this.header = header; boundaries = db.positions; if (boundaries == null) @@@ -74,8 -69,8 +74,8 @@@ if (localDir == null) throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); - Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format)); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, lifecycleNewTracker); + Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(localDir), format); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, isTransient, sstableLevel, header, txn); ++ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, isTransient, sstableLevel, header, lifecycleNewTracker); } } @@@ -96,8 -91,8 +96,8 @@@ if (currentWriter != null) finishedWriters.add(currentWriter); - Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex))), format); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, lifecycleNewTracker); + Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex)), format); - currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, isTransient, sstableLevel, header, txn); ++ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, isTransient, sstableLevel, header, lifecycleNewTracker); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org