http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index cca59cf,e320f30..f54bc03
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@@ -104,10 -96,10 +104,10 @@@ public abstract class SSTableWriter ext
                                         MetadataCollector metadataCollector,
                                         SerializationHeader header,
                                         Collection<Index> indexes,
-                                        LifecycleTransaction txn)
+                                        LifecycleNewTracker 
lifecycleNewTracker)
      {
          Factory writerFactory = descriptor.getFormat().getWriterFactory();
-         return writerFactory.open(descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, metadata, metadataCollector, header, 
observers(descriptor, indexes, txn.opType()), txn);
 -        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, 
metadataCollector, header, observers(descriptor, indexes, 
lifecycleNewTracker.opType()), lifecycleNewTracker);
++        return writerFactory.open(descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, metadata, metadataCollector, header, 
observers(descriptor, indexes, lifecycleNewTracker.opType()), 
lifecycleNewTracker);
      }
  
      public static SSTableWriter create(Descriptor descriptor,
@@@ -118,13 -108,13 +118,13 @@@
                                         int sstableLevel,
                                         SerializationHeader header,
                                         Collection<Index> indexes,
-                                        LifecycleTransaction txn)
+                                        LifecycleNewTracker 
lifecycleNewTracker)
      {
 -        CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
 -        return create(metadata, descriptor, keyCount, repairedAt, 
sstableLevel, header, indexes, lifecycleNewTracker);
 +        TableMetadataRef metadata = 
Schema.instance.getTableMetadataRef(descriptor);
-         return create(metadata, descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, sstableLevel, header, indexes, txn);
++        return create(metadata, descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, sstableLevel, header, indexes, lifecycleNewTracker);
      }
  
 -    public static SSTableWriter create(CFMetaData metadata,
 +    public static SSTableWriter create(TableMetadataRef metadata,
                                         Descriptor descriptor,
                                         long keyCount,
                                         long repairedAt,
@@@ -133,26 -121,36 +133,26 @@@
                                         int sstableLevel,
                                         SerializationHeader header,
                                         Collection<Index> indexes,
-                                        LifecycleTransaction txn)
+                                        LifecycleNewTracker 
lifecycleNewTracker)
      {
 -        MetadataCollector collector = new 
MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
 -        return create(descriptor, keyCount, repairedAt, metadata, collector, 
header, indexes, lifecycleNewTracker);
 -    }
 -
 -    public static SSTableWriter create(String filename,
 -                                       long keyCount,
 -                                       long repairedAt,
 -                                       int sstableLevel,
 -                                       SerializationHeader header,
 -                                       Collection<Index> indexes,
 -                                       LifecycleNewTracker 
lifecycleNewTracker)
 -    {
 -        return create(Descriptor.fromFilename(filename), keyCount, 
repairedAt, sstableLevel, header, indexes, lifecycleNewTracker);
 +        MetadataCollector collector = new 
MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel);
-         return create(descriptor, keyCount, repairedAt, pendingRepair, 
isTransient, metadata, collector, header, indexes, txn);
++        return create(descriptor, keyCount, repairedAt, pendingRepair, 
isTransient, metadata, collector, header, indexes, lifecycleNewTracker);
      }
  
      @VisibleForTesting
 -    public static SSTableWriter create(String filename,
 +    public static SSTableWriter create(Descriptor descriptor,
                                         long keyCount,
                                         long repairedAt,
 +                                       UUID pendingRepair,
 +                                       boolean isTransient,
                                         SerializationHeader header,
                                         Collection<Index> indexes,
-                                        LifecycleTransaction txn)
+                                        LifecycleNewTracker 
lifecycleNewTracker)
      {
-         return create(descriptor, keyCount, repairedAt, pendingRepair, 
isTransient, 0, header, indexes, txn);
 -        Descriptor descriptor = Descriptor.fromFilename(filename);
 -        return create(descriptor, keyCount, repairedAt, 0, header, indexes, 
lifecycleNewTracker);
++        return create(descriptor, keyCount, repairedAt, pendingRepair, 
isTransient, 0, header, indexes, lifecycleNewTracker);
      }
  
 -    private static Set<Component> components(CFMetaData metadata)
 +    private static Set<Component> components(TableMetadata metadata)
      {
          Set<Component> components = new 
HashSet<Component>(Arrays.asList(Component.DATA,
                  Component.PRIMARY_INDEX,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index d65a7c0,9af7dc0..448808c
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@@ -19,16 -19,11 +19,16 @@@ package org.apache.cassandra.io.sstable
  
  import java.util.Collection;
  import java.util.Set;
 +import java.util.UUID;
  
 -import org.apache.cassandra.config.CFMetaData;
 +import com.google.common.base.Preconditions;
 +
 +import org.apache.cassandra.io.sstable.SSTable;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.schema.TableMetadataRef;
  import org.apache.cassandra.db.RowIndexEntry;
  import org.apache.cassandra.db.SerializationHeader;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;
  import org.apache.cassandra.io.sstable.format.*;
@@@ -93,10 -87,9 +93,10 @@@ public class BigFormat implements SSTab
                                    MetadataCollector metadataCollector,
                                    SerializationHeader header,
                                    Collection<SSTableFlushObserver> observers,
-                                   LifecycleTransaction txn)
+                                   LifecycleNewTracker lifecycleNewTracker)
          {
 -            return new BigTableWriter(descriptor, keyCount, repairedAt, 
metadata, metadataCollector, header, observers, lifecycleNewTracker);
 +            SSTable.validateRepairedMetadata(repairedAt, pendingRepair, 
isTransient);
-             return new BigTableWriter(descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, metadata, metadataCollector, header, observers, 
txn);
++            return new BigTableWriter(descriptor, keyCount, repairedAt, 
pendingRepair, isTransient, metadata, metadataCollector, header, observers, 
lifecycleNewTracker);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 7513e95,9083cd3..70f568d
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -17,10 -17,15 +17,11 @@@
   */
  package org.apache.cassandra.io.sstable.format.big;
  
 -import java.io.File;
 -import java.io.FileOutputStream;
 -import java.io.IOException;
 +import java.io.*;
  import java.nio.ByteBuffer;
 -import java.util.Collection;
 -import java.util.Map;
 -import java.util.Optional;
 +import java.util.*;
  
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -73,10 -75,10 +73,10 @@@ public class BigTableWriter extends SST
                            MetadataCollector metadataCollector, 
                            SerializationHeader header,
                            Collection<SSTableFlushObserver> observers,
-                           LifecycleTransaction txn)
+                           LifecycleNewTracker lifecycleNewTracker)
      {
 -        super(descriptor, keyCount, repairedAt, metadata, metadataCollector, 
header, observers);
 +        super(descriptor, keyCount, repairedAt, pendingRepair, isTransient, 
metadata, metadataCollector, header, observers);
-         txn.trackNew(this); // must track before any files are created
+         lifecycleNewTracker.trackNew(this); // must track before any files 
are created
  
          if (compression)
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
index 400f119,0000000..8826381
mode 100644,000000..100644
--- 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
+++ 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
@@@ -1,226 -1,0 +1,227 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.EnumMap;
 +import java.util.Map;
 +
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.ImmutableSet;
 +import com.google.common.collect.Sets;
++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTable;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.SequentialWriter;
 +import org.apache.cassandra.io.util.SequentialWriterOption;
 +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.schema.TableMetadataRef;
 +
 +import static java.lang.String.format;
 +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
 +
 +public class BigTableZeroCopyWriter extends SSTable implements 
SSTableMultiWriter
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(BigTableZeroCopyWriter.class);
 +
 +    private final TableMetadataRef metadata;
 +    private volatile SSTableReader finalReader;
 +    private final Map<Component.Type, SequentialWriter> componentWriters;
 +
 +    private static final SequentialWriterOption WRITER_OPTION =
 +        SequentialWriterOption.newBuilder()
 +                              .trickleFsync(false)
 +                              .bufferSize(2 << 20)
 +                              .bufferType(BufferType.OFF_HEAP)
 +                              .build();
 +
 +    private static final ImmutableSet<Component> SUPPORTED_COMPONENTS =
 +        ImmutableSet.of(Component.DATA,
 +                        Component.PRIMARY_INDEX,
 +                        Component.SUMMARY,
 +                        Component.STATS,
 +                        Component.COMPRESSION_INFO,
 +                        Component.FILTER,
 +                        Component.DIGEST,
 +                        Component.CRC);
 +
 +    public BigTableZeroCopyWriter(Descriptor descriptor,
 +                                  TableMetadataRef metadata,
-                                   LifecycleTransaction txn,
++                                  LifecycleNewTracker lifecycleNewTracker,
 +                                  final Collection<Component> components)
 +    {
 +        super(descriptor, ImmutableSet.copyOf(components), metadata, 
DatabaseDescriptor.getDiskOptimizationStrategy());
 +
-         txn.trackNew(this);
++        lifecycleNewTracker.trackNew(this);
 +        this.metadata = metadata;
 +        this.componentWriters = new EnumMap<>(Component.Type.class);
 +
 +        if (!SUPPORTED_COMPONENTS.containsAll(components))
 +            throw new AssertionError(format("Unsupported streaming component 
detected %s",
 +                                            
Sets.difference(ImmutableSet.copyOf(components), SUPPORTED_COMPONENTS)));
 +
 +        for (Component c : components)
 +            componentWriters.put(c.type, makeWriter(descriptor, c));
 +    }
 +
 +    private static SequentialWriter makeWriter(Descriptor descriptor, 
Component component)
 +    {
 +        return new SequentialWriter(new 
File(descriptor.filenameFor(component)), WRITER_OPTION, false);
 +    }
 +
 +    private void write(DataInputPlus in, long size, SequentialWriter out) 
throws FSWriteError
 +    {
 +        final int BUFFER_SIZE = 1 << 20;
 +        long bytesRead = 0;
 +        byte[] buff = new byte[BUFFER_SIZE];
 +        try
 +        {
 +            while (bytesRead < size)
 +            {
 +                int toRead = (int) Math.min(size - bytesRead, BUFFER_SIZE);
 +                in.readFully(buff, 0, toRead);
 +                int count = Math.min(toRead, BUFFER_SIZE);
 +                out.write(buff, 0, count);
 +                bytesRead += count;
 +            }
 +            out.sync();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, out.getPath());
 +        }
 +    }
 +
 +    @Override
 +    public boolean append(UnfilteredRowIterator partition)
 +    {
 +        throw new UnsupportedOperationException("Operation not supported by 
BigTableBlockWriter");
 +    }
 +
 +    @Override
 +    public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, 
boolean openResult)
 +    {
 +        return finish(openResult);
 +    }
 +
 +    @Override
 +    public Collection<SSTableReader> finish(boolean openResult)
 +    {
 +        setOpenResult(openResult);
 +        return finished();
 +    }
 +
 +    @Override
 +    public Collection<SSTableReader> finished()
 +    {
 +        if (finalReader == null)
 +            finalReader = SSTableReader.open(descriptor, components, 
metadata);
 +
 +        return ImmutableList.of(finalReader);
 +    }
 +
 +    @Override
 +    public SSTableMultiWriter setOpenResult(boolean openResult)
 +    {
 +        return null;
 +    }
 +
 +    @Override
 +    public long getFilePointer()
 +    {
 +        return 0;
 +    }
 +
 +    @Override
 +    public TableId getTableId()
 +    {
 +        return metadata.id;
 +    }
 +
 +    @Override
 +    public Throwable commit(Throwable accumulate)
 +    {
 +        for (SequentialWriter writer : componentWriters.values())
 +            accumulate = writer.commit(accumulate);
 +        return accumulate;
 +    }
 +
 +    @Override
 +    public Throwable abort(Throwable accumulate)
 +    {
 +        for (SequentialWriter writer : componentWriters.values())
 +            accumulate = writer.abort(accumulate);
 +        return accumulate;
 +    }
 +
 +    @Override
 +    public void prepareToCommit()
 +    {
 +        for (SequentialWriter writer : componentWriters.values())
 +            writer.prepareToCommit();
 +    }
 +
 +    @Override
 +    public void close()
 +    {
 +        for (SequentialWriter writer : componentWriters.values())
 +            writer.close();
 +    }
 +
 +    public void writeComponent(Component.Type type, DataInputPlus in, long 
size)
 +    {
 +        logger.info("Writing component {} to {} length {}", type, 
componentWriters.get(type).getPath(), prettyPrintMemory(size));
 +
 +        if (in instanceof RebufferingByteBufDataInputPlus)
 +            write((RebufferingByteBufDataInputPlus) in, size, 
componentWriters.get(type));
 +        else
 +            write(in, size, componentWriters.get(type));
 +    }
 +
 +    private void write(RebufferingByteBufDataInputPlus in, long size, 
SequentialWriter writer)
 +    {
 +        logger.info("Block Writing component to {} length {}", 
writer.getPath(), prettyPrintMemory(size));
 +
 +        try
 +        {
 +            long bytesWritten = in.consumeUntil(writer, size);
 +
 +            if (bytesWritten != size)
 +                throw new IOException(format("Failed to read correct number 
of bytes from channel %s", writer));
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, writer.getPath());
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 5e60f7a,cf28bf7..874097d
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@@ -436,16 -309,14 +436,16 @@@ public abstract class AbstractReplicati
      {
          try
          {
 -            if (Integer.parseInt(rf) < 0)
 +            ReplicationFactor rf = ReplicationFactor.fromString(s);
 +            if (rf.hasTransientReplicas())
              {
 -                throw new ConfigurationException("Replication factor must be 
non-negative; found " + rf);
 +                if (DatabaseDescriptor.getNumTokens() > 1)
-                     throw new ConfigurationException("Transient replication 
is not supported with vnodes yet");
++                    throw new ConfigurationException(String.format("Transient 
replication is not supported with vnodes yet"));
              }
          }
 -        catch (NumberFormatException e2)
 +        catch (IllegalArgumentException e)
          {
 -            throw new ConfigurationException("Replication factor must be 
numeric; found " + rf);
 +            throw new ConfigurationException(e.getMessage());
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 0a96f4c,5388dd6..31c60be
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -128,8 -196,83 +128,8 @@@ public class StreamReceiveTask extends 
                      task.session.taskCompleted(task);
                      return;
                  }
 -                cfs = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 -                hasViews = !Iterables.isEmpty(View.findAll(kscf.left, 
kscf.right));
 -                hasCDC = cfs.metadata.params.cdc;
 -
 -                Collection<SSTableReader> readers = task.sstables;
 -
 -                try (Refs<SSTableReader> refs = Refs.ref(readers))
 -                {
 -                    /*
 -                     * We have a special path for views and for CDC.
 -                     *
 -                     * For views, since the view requires cleaning up any 
pre-existing state, we must put all partitions
 -                     * through the same write path as normal mutations. This 
also ensures any 2is are also updated.
 -                     *
 -                     * For CDC-enabled tables, we want to ensure that the 
mutations are run through the CommitLog so they
 -                     * can be archived by the CDC process on discard.
 -                     */
 -                    if (hasViews || hasCDC)
 -                    {
 -                        for (SSTableReader reader : readers)
 -                        {
 -                            Keyspace ks = 
Keyspace.open(reader.getKeyspaceName());
 -                            try (ISSTableScanner scanner = 
reader.getScanner())
 -                            {
 -                                while (scanner.hasNext())
 -                                {
 -                                    try (UnfilteredRowIterator rowIterator = 
scanner.next())
 -                                    {
 -                                        Mutation m = new 
Mutation(PartitionUpdate.fromIterator(rowIterator, 
ColumnFilter.all(cfs.metadata)));
 -
 -                                        // MV *can* be applied unsafe if 
there's no CDC on the CFS as we flush below
 -                                        // before transaction is done.
 -                                        //
 -                                        // If the CFS has CDC, however, these 
updates need to be written to the CommitLog
 -                                        // so they get archived into the 
cdc_raw folder
 -                                        ks.apply(m, hasCDC, true, false);
 -                                    }
 -                                }
 -                            }
 -                        }
 -                    }
 -                    else
 -                    {
 -                        task.finishTransaction();
 -
 -                        logger.debug("[Stream #{}] Received {} sstables from 
{} ({})", task.session.planId(), readers.size(), task.session.peer, readers);
 -                        // add sstables and build secondary indexes
 -                        cfs.addSSTables(readers);
 -                        cfs.indexManager.buildAllIndexesBlocking(readers);
 -
 -                        //invalidate row and counter cache
 -                        if (cfs.isRowCacheEnabled() || 
cfs.metadata.isCounter())
 -                        {
 -                            List<Bounds<Token>> boundsToInvalidate = new 
ArrayList<>(readers.size());
 -                            readers.forEach(sstable -> 
boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), 
sstable.last.getToken())));
 -                            Set<Bounds<Token>> nonOverlappingBounds = 
Bounds.getNonOverlappingBounds(boundsToInvalidate);
  
-                 task.receiver.finished();;
 -                            if (cfs.isRowCacheEnabled())
 -                            {
 -                                int invalidatedKeys = 
cfs.invalidateRowCache(nonOverlappingBounds);
 -                                if (invalidatedKeys > 0)
 -                                    logger.debug("[Stream #{}] Invalidated {} 
row cache entries on table {}.{} after stream " +
 -                                                 "receive task completed.", 
task.session.planId(), invalidatedKeys,
 -                                                 cfs.keyspace.getName(), 
cfs.getTableName());
 -                            }
 -
 -                            if (cfs.metadata.isCounter())
 -                            {
 -                                int invalidatedKeys = 
cfs.invalidateCounterCache(nonOverlappingBounds);
 -                                if (invalidatedKeys > 0)
 -                                    logger.debug("[Stream #{}] Invalidated {} 
counter cache entries on table {}.{} after stream " +
 -                                                 "receive task completed.", 
task.session.planId(), invalidatedKeys,
 -                                                 cfs.keyspace.getName(), 
cfs.getTableName());
 -                            }
 -                        }
 -                    }
 -                }
++                task.receiver.finished();
                  task.session.taskCompleted(task);
              }
              catch (Throwable t)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to