Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 bbf7dac87 -> 84ffcb82a
  refs/heads/cassandra-3.11 6a449b88d -> 732c43b0f
  refs/heads/trunk 4dd7faa75 -> 33eada06a


Prevent concurrent access to lifecycle txn when streaming

patch by Stefania Alborghetti; reviewed by Robert Stupp for CASSANDRA-14554


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3539a078
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3539a078
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3539a078

Branch: refs/heads/cassandra-3.11
Commit: 3539a07884d0a94c1432d7dda0ec17676195cc53
Parents: bbf7dac
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Wed Nov 7 14:07:12 2018 +0800
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Fri Dec 7 12:18:05 2018 +0000

----------------------------------------------------------------------
 .../apache/cassandra/db/ColumnFamilyStore.java  |  8 ++--
 .../compaction/AbstractCompactionStrategy.java  |  6 +--
 .../compaction/CompactionStrategyManager.java   |  8 ++--
 .../db/lifecycle/LifecycleNewTracker.java       | 47 ++++++++++++++++++++
 .../db/lifecycle/LifecycleTransaction.java      |  7 ++-
 .../apache/cassandra/db/lifecycle/LogFile.java  | 24 ++++------
 .../cassandra/db/lifecycle/LogTransaction.java  |  2 +-
 .../io/sstable/SimpleSSTableMultiWriter.java    |  6 +--
 .../io/sstable/format/SSTableWriter.java        | 24 +++++-----
 .../io/sstable/format/big/BigFormat.java        |  6 +--
 .../io/sstable/format/big/BigTableWriter.java   |  6 +--
 .../cassandra/streaming/StreamReader.java       |  6 +--
 .../cassandra/streaming/StreamReceiveTask.java  | 37 +++++++++++++--
 .../cassandra/streaming/StreamSession.java      |  4 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |  9 ++--
 15 files changed, 139 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4c7bc46..c455c4c 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -475,15 +475,15 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         return directories;
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, 
LifecycleTransaction txn)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, 
LifecycleNewTracker lifecycleNewTracker)
     {
         MetadataCollector collector = new 
MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
-        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
collector, header, txn);
+        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, 
collector, header, lifecycleNewTracker);
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, MetadataCollector metadataCollector, 
SerializationHeader header, LifecycleTransaction txn)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, MetadataCollector metadataCollector, 
SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
     {
-        return 
getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, metadataCollector, header, txn);
+        return 
getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, metadataCollector, header, lifecycleNewTracker);
     }
 
     public boolean supportsEarlyOpen()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index a80a6f4..9f07691 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -27,7 +27,7 @@ import com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
@@ -514,9 +514,9 @@ public abstract class AbstractCompactionStrategy
         return groupedSSTables;
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader 
header, LifecycleTransaction txn)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader 
header, LifecycleNewTracker lifecycleNewTracker)
     {
-        return SimpleSSTableMultiWriter.create(descriptor, keyCount, 
repairedAt, cfs.metadata, meta, header, txn);
+        return SimpleSSTableMultiWriter.create(descriptor, keyCount, 
repairedAt, cfs.metadata, meta, header, lifecycleNewTracker);
     }
 
     public boolean supportsEarlyOpen()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index a9bfbd2..1d3d18c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 import java.util.concurrent.Callable;
 
-import com.google.common.collect.Iterables;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -490,15 +490,15 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         return 
Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, MetadataCollector collector, 
SerializationHeader header, LifecycleTransaction txn)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, 
long keyCount, long repairedAt, MetadataCollector collector, 
SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
     {
         if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
         {
-            return unrepaired.createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, collector, header, txn);
+            return unrepaired.createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, collector, header, lifecycleNewTracker);
         }
         else
         {
-            return repaired.createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, collector, header, txn);
+            return repaired.createSSTableMultiWriter(descriptor, keyCount, 
repairedAt, collector, header, lifecycleNewTracker);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java 
b/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java
new file mode 100644
index 0000000..9a0785c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.cassandra.db.lifecycle;
+
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.SSTable;
+
+/**
+ * An interface for tracking new sstables added to a LifecycleTransaction, 
possibly through some proxy.
+ */
+public interface LifecycleNewTracker
+{
+    /**
+     * Called when a new table is about to be created, so that this table can 
be tracked by a transaction.
+     * @param table - the new table to be tracked
+     */
+    void trackNew(SSTable table);
+
+
+    /**
+     * Called when a new table is no longer required, so that this table can 
be untracked by a transaction.
+     * @param table - the table to be untracked
+     */
+    void untrackNew(SSTable table);
+
+    /**
+     * @return the type of operation tracking these sstables
+     */
+    OperationType opType();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java 
b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 582c9d8..af9a80a 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -55,7 +55,7 @@ import static 
org.apache.cassandra.utils.concurrent.Refs.selfRefs;
  * action to occur at the beginning of the commit phase, but also *requires* 
that the prepareToCommit() phase only take
  * actions that can be rolled back.
  */
-public class LifecycleTransaction extends Transactional.AbstractTransactional
+public class LifecycleTransaction extends Transactional.AbstractTransactional 
implements LifecycleNewTracker
 {
     private static final Logger logger = 
LoggerFactory.getLogger(LifecycleTransaction.class);
 
@@ -176,6 +176,7 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional
         return log;
     }
 
+    @Override //LifecycleNewTracker
     public OperationType opType()
     {
         return log.type();
@@ -523,11 +524,15 @@ public class LifecycleTransaction extends 
Transactional.AbstractTransactional
         return getFirst(originals, null);
     }
 
+    // LifecycleNewTracker
+
+    @Override
     public void trackNew(SSTable table)
     {
         log.trackNew(table);
     }
 
+    @Override
     public void untrackNew(SSTable table)
     {
         log.untrackNew(table);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index 8425a6d..e9047ad 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -246,13 +246,11 @@ final class LogFile implements AutoCloseable
 
     void commit()
     {
-        assert !completed() : "Already completed!";
         addRecord(LogRecord.makeCommit(System.currentTimeMillis()));
     }
 
     void abort()
     {
-        assert !completed() : "Already completed!";
         addRecord(LogRecord.makeAbort(System.currentTimeMillis()));
     }
 
@@ -281,20 +279,13 @@ final class LogFile implements AutoCloseable
 
     void add(Type type, SSTable table)
     {
-        add(makeRecord(type, table));
-    }
-
-    void add(LogRecord record)
-    {
-        if (!addRecord(record))
-            throw new IllegalStateException();
+        addRecord(makeRecord(type, table));
     }
 
     public void addAll(Type type, Iterable<SSTableReader> toBulkAdd)
     {
         for (LogRecord record : makeRecords(type, toBulkAdd).values())
-            if (!addRecord(record))
-                throw new IllegalStateException();
+            addRecord(record);
     }
 
     Map<SSTable, LogRecord> makeRecords(Type type, Iterable<SSTableReader> 
tables)
@@ -332,14 +323,17 @@ final class LogFile implements AutoCloseable
         return record.asType(type);
     }
 
-    private boolean addRecord(LogRecord record)
+    void addRecord(LogRecord record)
     {
+        if (completed())
+            throw new IllegalStateException("Transaction already completed");
+
         if (records.contains(record))
-            return false;
+            throw new IllegalStateException("Record already exists");
 
         replicas.append(record);
-
-        return records.add(record);
+        if (!records.add(record))
+            throw new IllegalStateException("Failed to add record");
     }
 
     void remove(Type type, SSTable table)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index a10bcd2..00a222a 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@ -164,7 +164,7 @@ class LogTransaction extends 
Transactional.AbstractTransactional implements Tran
             return new SSTableTidier(reader, true, this);
         }
 
-        txnFile.add(logRecord);
+        txnFile.addRecord(logRecord);
 
         if (tracker != null)
             tracker.notifyDeleting(reader);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index fd1b9a7..ded070e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -25,7 +25,7 @@ import java.util.UUID;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -109,9 +109,9 @@ public class SimpleSSTableMultiWriter implements 
SSTableMultiWriter
                                             CFMetaData cfm,
                                             MetadataCollector 
metadataCollector,
                                             SerializationHeader header,
-                                            LifecycleTransaction txn)
+                                            LifecycleNewTracker 
lifecycleNewTracker)
     {
-        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, 
repairedAt, cfm, metadataCollector, header, txn);
+        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, 
repairedAt, cfm, metadataCollector, header, lifecycleNewTracker);
         return new SimpleSSTableMultiWriter(writer);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 5f35029..fcc23a2 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -90,16 +90,16 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
                                        CFMetaData metadata,
                                        MetadataCollector metadataCollector,
                                        SerializationHeader header,
-                                       LifecycleTransaction txn)
+                                       LifecycleNewTracker lifecycleNewTracker)
     {
         Factory writerFactory = descriptor.getFormat().getWriterFactory();
-        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, 
metadataCollector, header, txn);
+        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, 
metadataCollector, header, lifecycleNewTracker);
     }
 
-    public static SSTableWriter create(Descriptor descriptor, long keyCount, 
long repairedAt, int sstableLevel, SerializationHeader header, 
LifecycleTransaction txn)
+    public static SSTableWriter create(Descriptor descriptor, long keyCount, 
long repairedAt, int sstableLevel, SerializationHeader header, 
LifecycleNewTracker lifecycleNewTracker)
     {
         CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
-        return create(metadata, descriptor, keyCount, repairedAt, 
sstableLevel, header, txn);
+        return create(metadata, descriptor, keyCount, repairedAt, 
sstableLevel, header, lifecycleNewTracker);
     }
 
     public static SSTableWriter create(CFMetaData metadata,
@@ -108,21 +108,21 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
                                        long repairedAt,
                                        int sstableLevel,
                                        SerializationHeader header,
-                                       LifecycleTransaction txn)
+                                       LifecycleNewTracker lifecycleNewTracker)
     {
         MetadataCollector collector = new 
MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
-        return create(descriptor, keyCount, repairedAt, metadata, collector, 
header, txn);
+        return create(descriptor, keyCount, repairedAt, metadata, collector, 
header, lifecycleNewTracker);
     }
 
-    public static SSTableWriter create(String filename, long keyCount, long 
repairedAt, int sstableLevel, SerializationHeader header,LifecycleTransaction 
txn)
+    public static SSTableWriter create(String filename, long keyCount, long 
repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker 
lifecycleNewTracker)
     {
-        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 
sstableLevel, header, txn);
+        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 
sstableLevel, header, lifecycleNewTracker);
     }
 
     @VisibleForTesting
-    public static SSTableWriter create(String filename, long keyCount, long 
repairedAt, SerializationHeader header, LifecycleTransaction txn)
+    public static SSTableWriter create(String filename, long keyCount, long 
repairedAt, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
     {
-        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 
0, header, txn);
+        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 
0, header, lifecycleNewTracker);
     }
 
     private static Set<Component> components(CFMetaData metadata)
@@ -285,6 +285,6 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
                                            CFMetaData metadata,
                                            MetadataCollector metadataCollector,
                                            SerializationHeader header,
-                                           LifecycleTransaction txn);
+                                           LifecycleNewTracker 
lifecycleNewTracker);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index ae93c5f..360ef8a 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -22,7 +22,7 @@ import java.util.Set;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
@@ -88,9 +88,9 @@ public class BigFormat implements SSTableFormat
                                   CFMetaData metadata,
                                   MetadataCollector metadataCollector,
                                   SerializationHeader header,
-                                  LifecycleTransaction txn)
+                                  LifecycleNewTracker lifecycleNewTracker)
         {
-            return new BigTableWriter(descriptor, keyCount, repairedAt, 
metadata, metadataCollector, header, txn);
+            return new BigTableWriter(descriptor, keyCount, repairedAt, 
metadata, metadataCollector, header, lifecycleNewTracker);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 0d500c1..f733619 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -21,12 +21,12 @@ import java.io.*;
 import java.util.Map;
 
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
@@ -63,10 +63,10 @@ public class BigTableWriter extends SSTableWriter
                           CFMetaData metadata, 
                           MetadataCollector metadataCollector, 
                           SerializationHeader header,
-                          LifecycleTransaction txn)
+                          LifecycleNewTracker lifecycleNewTracker)
     {
         super(descriptor, keyCount, repairedAt, metadata, metadataCollector, 
header);
-        txn.trackNew(this); // must track before any files are created
+        lifecycleNewTracker.trackNew(this); // must track before any files are 
created
 
         if (compression)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java 
b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 4ca7937..07278cb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -34,6 +34,7 @@ import com.ning.compress.lzf.LZFInputStream;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
@@ -49,8 +50,6 @@ import org.apache.cassandra.io.util.TrackedInputStream;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
-import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
-
 /**
  * StreamReader reads from stream and writes to SSTable.
  */
@@ -156,7 +155,8 @@ public class StreamReader
             throw new IOException("Insufficient disk space to store " + 
totalSize + " bytes");
         desc = 
Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir),
 format));
 
-        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, 
sstableLevel, getHeader(cfs.metadata), session.getTransaction(cfId));
+        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, 
sstableLevel, getHeader(cfs.metadata),
+                session.getReceivingTask(cfId).createLifecycleNewTracker());
     }
 
     protected long totalSize()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java 
b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 9e65d34..ea82d9b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -26,6 +26,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +44,7 @@ import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -132,11 +135,39 @@ public class StreamReceiveTask extends StreamTask
         return totalSize;
     }
 
-    public synchronized LifecycleTransaction getTransaction()
+    /**
+     * @return a LifecycleNewTracker whose operations are synchronised on this 
StreamReceiveTask.
+     */
+    public synchronized LifecycleNewTracker createLifecycleNewTracker()
     {
         if (done)
-            throw new RuntimeException(String.format("Stream receive task {} 
of cf {} already finished.", session.planId(), cfId));
-        return txn;
+            throw new RuntimeException(String.format("Stream receive task %s 
of cf %s already finished.", session.planId(), cfId));
+
+        return new LifecycleNewTracker()
+        {
+            @Override
+            public void trackNew(SSTable table)
+            {
+                synchronized (StreamReceiveTask.this)
+                {
+                    txn.trackNew(table);
+                }
+            }
+
+            @Override
+            public void untrackNew(SSTable table)
+            {
+                synchronized (StreamReceiveTask.this)
+                {
+                    txn.untrackNew(table);
+                }
+            }
+
+            public OperationType opType()
+            {
+                return txn.opType();
+            }
+        };
     }
 
     private static class OnCompletionRunnable implements Runnable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index d57fae8..c79a711 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -208,10 +208,10 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     }
 
 
-    public LifecycleTransaction getTransaction(UUID cfId)
+    StreamReceiveTask getReceivingTask(UUID cfId)
     {
         assert receivers.containsKey(cfId);
-        return receivers.get(cfId).getTransaction();
+        return receivers.get(cfId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3539a078/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java 
b/test/unit/org/apache/cassandra/db/ScrubTest.java
index fc2faea..757add9 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -26,6 +26,7 @@ import java.nio.file.Paths;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -632,11 +633,11 @@ public class ScrubTest
         assertOrdered(Util.cmd(cfs).filterOn(colName, Operator.EQ, 
1L).build(), numRows / 2);
     }
 
-    private static SSTableMultiWriter createTestWriter(Descriptor descriptor, 
long keyCount, CFMetaData metadata, LifecycleTransaction txn)
+    private static SSTableMultiWriter createTestWriter(Descriptor descriptor, 
long keyCount, CFMetaData metadata, LifecycleNewTracker lifecycleNewTracker)
     {
         SerializationHeader header = new SerializationHeader(true, metadata, 
metadata.partitionColumns(), EncodingStats.NO_STATS);
         MetadataCollector collector = new 
MetadataCollector(metadata.comparator).sstableLevel(0);
-        return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, 
metadata, collector, header, txn));
+        return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, 
metadata, collector, header, lifecycleNewTracker));
     }
 
     private static class TestMultiWriter extends SimpleSSTableMultiWriter
@@ -653,9 +654,9 @@ public class ScrubTest
     private static class TestWriter extends BigTableWriter
     {
         TestWriter(Descriptor descriptor, long keyCount, long repairedAt, 
CFMetaData metadata,
-                   MetadataCollector collector, SerializationHeader header, 
LifecycleTransaction txn)
+                   MetadataCollector collector, SerializationHeader header, 
LifecycleNewTracker lifecycleNewTracker)
         {
-            super(descriptor, keyCount, repairedAt, metadata, collector, 
header, txn);
+            super(descriptor, keyCount, repairedAt, metadata, collector, 
header, lifecycleNewTracker);
         }
 
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to