Streaming needs to synchronise access to LifecycleTransaction patch by Stefania Alborghetti and Benedict; reviewed by Robert Stupp for CASSANDRA-14554
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84ffcb82 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84ffcb82 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84ffcb82 Branch: refs/heads/cassandra-3.0 Commit: 84ffcb82a74667b957201f2cdae2d6b308956549 Parents: bbf7dac Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Wed Nov 7 14:07:12 2018 +0800 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Mon Dec 10 15:00:00 2018 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 8 ++-- .../compaction/AbstractCompactionStrategy.java | 6 +-- .../compaction/CompactionStrategyManager.java | 8 ++-- .../db/lifecycle/LifecycleNewTracker.java | 47 ++++++++++++++++++++ .../db/lifecycle/LifecycleTransaction.java | 7 ++- .../apache/cassandra/db/lifecycle/LogFile.java | 24 ++++------ .../cassandra/db/lifecycle/LogTransaction.java | 2 +- .../io/sstable/SimpleSSTableMultiWriter.java | 6 +-- .../io/sstable/format/SSTableWriter.java | 24 +++++----- .../io/sstable/format/big/BigFormat.java | 6 +-- .../io/sstable/format/big/BigTableWriter.java | 6 +-- .../cassandra/streaming/StreamReader.java | 6 +-- .../cassandra/streaming/StreamReceiveTask.java | 37 +++++++++++++-- .../cassandra/streaming/StreamSession.java | 4 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 9 ++-- 16 files changed, 140 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f9b59df..01d4789 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.18 + * Streaming needs to synchronise access to LifecycleTransaction (CASSANDRA-14554) * Fix cassandra-stress write hang with default options (CASSANDRA-14616) * Differentiate between slices and RTs when decoding legacy bounds (CASSANDRA-14919) * CommitLogReplayer.handleReplayError should print stack traces (CASSANDRA-14589) http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 4c7bc46..c455c4c 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -475,15 +475,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return directories; } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel); - return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn); + return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker); } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { - return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, txn); + return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, lifecycleNewTracker); } public boolean supportsEarlyOpen() http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index a80a6f4..9f07691 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -27,7 +27,7 @@ import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.SerializationHeader; -import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter; @@ -514,9 +514,9 @@ public abstract class AbstractCompactionStrategy return groupedSSTables; } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleTransaction txn) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { - return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, txn); + return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, lifecycleNewTracker); } public boolean supportsEarlyOpen() http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index a9bfbd2..1d3d18c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -21,7 +21,7 @@ package org.apache.cassandra.db.compaction; import java.util.*; import java.util.concurrent.Callable; -import com.google.common.collect.Iterables; +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -490,15 +490,15 @@ public class CompactionStrategyManager implements INotificationConsumer return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES)); } - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) { - return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn); + return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker); } else { - return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn); + return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java new file mode 100644 index 0000000..9a0785c --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.cassandra.db.lifecycle; + +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.SSTable; + +/** + * An interface for tracking new sstables added to a LifecycleTransaction, possibly through some proxy. + */ +public interface LifecycleNewTracker +{ + /** + * Called when a new table is about to be created, so that this table can be tracked by a transaction. + * @param table - the new table to be tracked + */ + void trackNew(SSTable table); + + + /** + * Called when a new table is no longer required, so that this table can be untracked by a transaction. + * @param table - the table to be untracked + */ + void untrackNew(SSTable table); + + /** + * @return the type of operation tracking these sstables + */ + OperationType opType(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index 582c9d8..af9a80a 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@ -55,7 +55,7 @@ import static org.apache.cassandra.utils.concurrent.Refs.selfRefs; * action to occur at the beginning of the commit phase, but also *requires* that the prepareToCommit() phase only take * actions that can be rolled back. */ -public class LifecycleTransaction extends Transactional.AbstractTransactional +public class LifecycleTransaction extends Transactional.AbstractTransactional implements LifecycleNewTracker { private static final Logger logger = LoggerFactory.getLogger(LifecycleTransaction.class); @@ -176,6 +176,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional return log; } + @Override //LifecycleNewTracker public OperationType opType() { return log.type(); @@ -523,11 +524,15 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional return getFirst(originals, null); } + // LifecycleNewTracker + + @Override public void trackNew(SSTable table) { log.trackNew(table); } + @Override public void untrackNew(SSTable table) { log.untrackNew(table); http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/lifecycle/LogFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java index 8425a6d..e9047ad 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java @@ -246,13 +246,11 @@ final class LogFile implements AutoCloseable void commit() { - assert !completed() : "Already completed!"; addRecord(LogRecord.makeCommit(System.currentTimeMillis())); } void abort() { - assert !completed() : "Already completed!"; addRecord(LogRecord.makeAbort(System.currentTimeMillis())); } @@ -281,20 +279,13 @@ final class LogFile implements AutoCloseable void add(Type type, SSTable table) { - add(makeRecord(type, table)); - } - - void add(LogRecord record) - { - if (!addRecord(record)) - throw new IllegalStateException(); + addRecord(makeRecord(type, table)); } public void addAll(Type type, Iterable<SSTableReader> toBulkAdd) { for (LogRecord record : makeRecords(type, toBulkAdd).values()) - if (!addRecord(record)) - throw new IllegalStateException(); + addRecord(record); } Map<SSTable, LogRecord> makeRecords(Type type, Iterable<SSTableReader> tables) @@ -332,14 +323,17 @@ final class LogFile implements AutoCloseable return record.asType(type); } - private boolean addRecord(LogRecord record) + void addRecord(LogRecord record) { + if (completed()) + throw new IllegalStateException("Transaction already completed"); + if (records.contains(record)) - return false; + throw new IllegalStateException("Record already exists"); replicas.append(record); - - return records.add(record); + if (!records.add(record)) + throw new IllegalStateException("Failed to add record"); } void remove(Type type, SSTable table) http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java index a10bcd2..00a222a 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java @@ -164,7 +164,7 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran return new SSTableTidier(reader, true, this); } - txnFile.add(logRecord); + txnFile.addRecord(logRecord); if (tracker != null) tracker.notifyDeleting(reader); http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java index fd1b9a7..ded070e 100644 --- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java @@ -25,7 +25,7 @@ import java.util.UUID; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; @@ -109,9 +109,9 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter CFMetaData cfm, MetadataCollector metadataCollector, SerializationHeader header, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { - SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, txn); + SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, lifecycleNewTracker); return new SimpleSSTableMultiWriter(writer); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 5f35029..fcc23a2 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -30,7 +30,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; @@ -90,16 +90,16 @@ public abstract class SSTableWriter extends SSTable implements Transactional CFMetaData metadata, MetadataCollector metadataCollector, SerializationHeader header, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { Factory writerFactory = descriptor.getFormat().getWriterFactory(); - return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn); + return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, lifecycleNewTracker); } - public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn) + public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { CFMetaData metadata = Schema.instance.getCFMetaData(descriptor); - return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, txn); + return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, lifecycleNewTracker); } public static SSTableWriter create(CFMetaData metadata, @@ -108,21 +108,21 @@ public abstract class SSTableWriter extends SSTable implements Transactional long repairedAt, int sstableLevel, SerializationHeader header, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel); - return create(descriptor, keyCount, repairedAt, metadata, collector, header, txn); + return create(descriptor, keyCount, repairedAt, metadata, collector, header, lifecycleNewTracker); } - public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header,LifecycleTransaction txn) + public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { - return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, txn); + return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, lifecycleNewTracker); } @VisibleForTesting - public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header, LifecycleTransaction txn) + public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { - return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header, txn); + return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header, lifecycleNewTracker); } private static Set<Component> components(CFMetaData metadata) @@ -285,6 +285,6 @@ public abstract class SSTableWriter extends SSTable implements Transactional CFMetaData metadata, MetadataCollector metadataCollector, SerializationHeader header, - LifecycleTransaction txn); + LifecycleNewTracker lifecycleNewTracker); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index ae93c5f..360ef8a 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -22,7 +22,7 @@ import java.util.Set; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableFormat; @@ -88,9 +88,9 @@ public class BigFormat implements SSTableFormat CFMetaData metadata, MetadataCollector metadataCollector, SerializationHeader header, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { - return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn); + return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, lifecycleNewTracker); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 0d500c1..f733619 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -21,12 +21,12 @@ import java.io.*; import java.util.Map; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; @@ -63,10 +63,10 @@ public class BigTableWriter extends SSTableWriter CFMetaData metadata, MetadataCollector metadataCollector, SerializationHeader header, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header); - txn.trackNew(this); // must track before any files are created + lifecycleNewTracker.trackNew(this); // must track before any files are created if (compression) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 4ca7937..07278cb 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -34,6 +34,7 @@ import com.ning.compress.lzf.LZFInputStream; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; @@ -49,8 +50,6 @@ import org.apache.cassandra.io.util.TrackedInputStream; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause; - /** * StreamReader reads from stream and writes to SSTable. */ @@ -156,7 +155,8 @@ public class StreamReader throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format)); - return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, getHeader(cfs.metadata), session.getTransaction(cfId)); + return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, getHeader(cfs.metadata), + session.getReceivingTask(cfId).createLifecycleNewTracker()); } protected long totalSize() http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 9e65d34..ea82d9b 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -26,6 +26,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.google.common.collect.Iterables; + +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +44,7 @@ import org.apache.cassandra.db.view.View; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -132,11 +135,39 @@ public class StreamReceiveTask extends StreamTask return totalSize; } - public synchronized LifecycleTransaction getTransaction() + /** + * @return a LifecycleNewTracker whose operations are synchronised on this StreamReceiveTask. + */ + public synchronized LifecycleNewTracker createLifecycleNewTracker() { if (done) - throw new RuntimeException(String.format("Stream receive task {} of cf {} already finished.", session.planId(), cfId)); - return txn; + throw new RuntimeException(String.format("Stream receive task %s of cf %s already finished.", session.planId(), cfId)); + + return new LifecycleNewTracker() + { + @Override + public void trackNew(SSTable table) + { + synchronized (StreamReceiveTask.this) + { + txn.trackNew(table); + } + } + + @Override + public void untrackNew(SSTable table) + { + synchronized (StreamReceiveTask.this) + { + txn.untrackNew(table); + } + } + + public OperationType opType() + { + return txn.opType(); + } + }; } private static class OnCompletionRunnable implements Runnable http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index d57fae8..c79a711 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -208,10 +208,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber } - public LifecycleTransaction getTransaction(UUID cfId) + StreamReceiveTask getReceivingTask(UUID cfId) { assert receivers.containsKey(cfId); - return receivers.get(cfId).getTransaction(); + return receivers.get(cfId); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/84ffcb82/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index fc2faea..757add9 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -26,6 +26,7 @@ import java.nio.file.Paths; import java.util.*; import java.util.concurrent.ExecutionException; +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.commons.lang3.StringUtils; import org.junit.BeforeClass; import org.junit.Test; @@ -632,11 +633,11 @@ public class ScrubTest assertOrdered(Util.cmd(cfs).filterOn(colName, Operator.EQ, 1L).build(), numRows / 2); } - private static SSTableMultiWriter createTestWriter(Descriptor descriptor, long keyCount, CFMetaData metadata, LifecycleTransaction txn) + private static SSTableMultiWriter createTestWriter(Descriptor descriptor, long keyCount, CFMetaData metadata, LifecycleNewTracker lifecycleNewTracker) { SerializationHeader header = new SerializationHeader(true, metadata, metadata.partitionColumns(), EncodingStats.NO_STATS); MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(0); - return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn)); + return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, lifecycleNewTracker)); } private static class TestMultiWriter extends SimpleSSTableMultiWriter @@ -653,9 +654,9 @@ public class ScrubTest private static class TestWriter extends BigTableWriter { TestWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, - MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn) + MetadataCollector collector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { - super(descriptor, keyCount, repairedAt, metadata, collector, header, txn); + super(descriptor, keyCount, repairedAt, metadata, collector, header, lifecycleNewTracker); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org