http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index cca59cf,e320f30..f54bc03 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@@ -104,10 -96,10 +104,10 @@@ public abstract class SSTableWriter ext MetadataCollector metadataCollector, SerializationHeader header, Collection<Index> indexes, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { Factory writerFactory = descriptor.getFormat().getWriterFactory(); - return writerFactory.open(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn); - return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, lifecycleNewTracker.opType()), lifecycleNewTracker); ++ return writerFactory.open(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers(descriptor, indexes, lifecycleNewTracker.opType()), lifecycleNewTracker); } public static SSTableWriter create(Descriptor descriptor, @@@ -118,13 -108,13 +118,13 @@@ int sstableLevel, SerializationHeader header, Collection<Index> indexes, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { - CFMetaData metadata = Schema.instance.getCFMetaData(descriptor); - return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, lifecycleNewTracker); + TableMetadataRef metadata = Schema.instance.getTableMetadataRef(descriptor); - return create(metadata, descriptor, keyCount, repairedAt, pendingRepair, isTransient, sstableLevel, header, indexes, txn); ++ return create(metadata, descriptor, keyCount, repairedAt, pendingRepair, isTransient, sstableLevel, header, indexes, lifecycleNewTracker); } - public static SSTableWriter create(CFMetaData metadata, + public static SSTableWriter create(TableMetadataRef metadata, Descriptor descriptor, long keyCount, long repairedAt, @@@ -133,26 -121,36 +133,26 @@@ int sstableLevel, SerializationHeader header, Collection<Index> indexes, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { - MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel); - return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, lifecycleNewTracker); - } - - public static SSTableWriter create(String filename, - long keyCount, - long repairedAt, - int sstableLevel, - SerializationHeader header, - Collection<Index> indexes, - LifecycleNewTracker lifecycleNewTracker) - { - return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, indexes, lifecycleNewTracker); + MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel); - return create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, collector, header, indexes, txn); ++ return create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, collector, header, indexes, lifecycleNewTracker); } @VisibleForTesting - public static SSTableWriter create(String filename, + public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, + UUID pendingRepair, + boolean isTransient, SerializationHeader header, Collection<Index> indexes, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { - return create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, 0, header, indexes, txn); - Descriptor descriptor = Descriptor.fromFilename(filename); - return create(descriptor, keyCount, repairedAt, 0, header, indexes, lifecycleNewTracker); ++ return create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, 0, header, indexes, lifecycleNewTracker); } - private static Set<Component> components(CFMetaData metadata) + private static Set<Component> components(TableMetadata metadata) { Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA, Component.PRIMARY_INDEX,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index d65a7c0,9af7dc0..448808c --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@@ -19,16 -19,11 +19,16 @@@ package org.apache.cassandra.io.sstable import java.util.Collection; import java.util.Set; +import java.util.UUID; -import org.apache.cassandra.config.CFMetaData; +import com.google.common.base.Preconditions; + +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; - import org.apache.cassandra.db.lifecycle.LifecycleTransaction; + import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.*; @@@ -93,10 -87,9 +93,10 @@@ public class BigFormat implements SSTab MetadataCollector metadataCollector, SerializationHeader header, Collection<SSTableFlushObserver> observers, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { - return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, lifecycleNewTracker); + SSTable.validateRepairedMetadata(repairedAt, pendingRepair, isTransient); - return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers, txn); ++ return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers, lifecycleNewTracker); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 7513e95,9083cd3..70f568d --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@@ -17,10 -17,15 +17,11 @@@ */ package org.apache.cassandra.io.sstable.format.big; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; +import java.io.*; import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Map; -import java.util.Optional; +import java.util.*; + import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -73,10 -75,10 +73,10 @@@ public class BigTableWriter extends SST MetadataCollector metadataCollector, SerializationHeader header, Collection<SSTableFlushObserver> observers, - LifecycleTransaction txn) + LifecycleNewTracker lifecycleNewTracker) { - super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers); + super(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers); - txn.trackNew(this); // must track before any files are created + lifecycleNewTracker.trackNew(this); // must track before any files are created if (compression) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java index 400f119,0000000..8826381 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java @@@ -1,226 -1,0 +1,227 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.EnumMap; +import java.util.Map; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; ++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +import static java.lang.String.format; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +public class BigTableZeroCopyWriter extends SSTable implements SSTableMultiWriter +{ + private static final Logger logger = LoggerFactory.getLogger(BigTableZeroCopyWriter.class); + + private final TableMetadataRef metadata; + private volatile SSTableReader finalReader; + private final Map<Component.Type, SequentialWriter> componentWriters; + + private static final SequentialWriterOption WRITER_OPTION = + SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 << 20) + .bufferType(BufferType.OFF_HEAP) + .build(); + + private static final ImmutableSet<Component> SUPPORTED_COMPONENTS = + ImmutableSet.of(Component.DATA, + Component.PRIMARY_INDEX, + Component.SUMMARY, + Component.STATS, + Component.COMPRESSION_INFO, + Component.FILTER, + Component.DIGEST, + Component.CRC); + + public BigTableZeroCopyWriter(Descriptor descriptor, + TableMetadataRef metadata, - LifecycleTransaction txn, ++ LifecycleNewTracker lifecycleNewTracker, + final Collection<Component> components) + { + super(descriptor, ImmutableSet.copyOf(components), metadata, DatabaseDescriptor.getDiskOptimizationStrategy()); + - txn.trackNew(this); ++ lifecycleNewTracker.trackNew(this); + this.metadata = metadata; + this.componentWriters = new EnumMap<>(Component.Type.class); + + if (!SUPPORTED_COMPONENTS.containsAll(components)) + throw new AssertionError(format("Unsupported streaming component detected %s", + Sets.difference(ImmutableSet.copyOf(components), SUPPORTED_COMPONENTS))); + + for (Component c : components) + componentWriters.put(c.type, makeWriter(descriptor, c)); + } + + private static SequentialWriter makeWriter(Descriptor descriptor, Component component) + { + return new SequentialWriter(new File(descriptor.filenameFor(component)), WRITER_OPTION, false); + } + + private void write(DataInputPlus in, long size, SequentialWriter out) throws FSWriteError + { + final int BUFFER_SIZE = 1 << 20; + long bytesRead = 0; + byte[] buff = new byte[BUFFER_SIZE]; + try + { + while (bytesRead < size) + { + int toRead = (int) Math.min(size - bytesRead, BUFFER_SIZE); + in.readFully(buff, 0, toRead); + int count = Math.min(toRead, BUFFER_SIZE); + out.write(buff, 0, count); + bytesRead += count; + } + out.sync(); + } + catch (IOException e) + { + throw new FSWriteError(e, out.getPath()); + } + } + + @Override + public boolean append(UnfilteredRowIterator partition) + { + throw new UnsupportedOperationException("Operation not supported by BigTableBlockWriter"); + } + + @Override + public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult) + { + return finish(openResult); + } + + @Override + public Collection<SSTableReader> finish(boolean openResult) + { + setOpenResult(openResult); + return finished(); + } + + @Override + public Collection<SSTableReader> finished() + { + if (finalReader == null) + finalReader = SSTableReader.open(descriptor, components, metadata); + + return ImmutableList.of(finalReader); + } + + @Override + public SSTableMultiWriter setOpenResult(boolean openResult) + { + return null; + } + + @Override + public long getFilePointer() + { + return 0; + } + + @Override + public TableId getTableId() + { + return metadata.id; + } + + @Override + public Throwable commit(Throwable accumulate) + { + for (SequentialWriter writer : componentWriters.values()) + accumulate = writer.commit(accumulate); + return accumulate; + } + + @Override + public Throwable abort(Throwable accumulate) + { + for (SequentialWriter writer : componentWriters.values()) + accumulate = writer.abort(accumulate); + return accumulate; + } + + @Override + public void prepareToCommit() + { + for (SequentialWriter writer : componentWriters.values()) + writer.prepareToCommit(); + } + + @Override + public void close() + { + for (SequentialWriter writer : componentWriters.values()) + writer.close(); + } + + public void writeComponent(Component.Type type, DataInputPlus in, long size) + { + logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size)); + + if (in instanceof RebufferingByteBufDataInputPlus) + write((RebufferingByteBufDataInputPlus) in, size, componentWriters.get(type)); + else + write(in, size, componentWriters.get(type)); + } + + private void write(RebufferingByteBufDataInputPlus in, long size, SequentialWriter writer) + { + logger.info("Block Writing component to {} length {}", writer.getPath(), prettyPrintMemory(size)); + + try + { + long bytesWritten = in.consumeUntil(writer, size); + + if (bytesWritten != size) + throw new IOException(format("Failed to read correct number of bytes from channel %s", writer)); + } + catch (IOException e) + { + throw new FSWriteError(e, writer.getPath()); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index 5e60f7a,cf28bf7..874097d --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@@ -436,16 -309,14 +436,16 @@@ public abstract class AbstractReplicati { try { - if (Integer.parseInt(rf) < 0) + ReplicationFactor rf = ReplicationFactor.fromString(s); + if (rf.hasTransientReplicas()) { - throw new ConfigurationException("Replication factor must be non-negative; found " + rf); + if (DatabaseDescriptor.getNumTokens() > 1) - throw new ConfigurationException("Transient replication is not supported with vnodes yet"); ++ throw new ConfigurationException(String.format("Transient replication is not supported with vnodes yet")); } } - catch (NumberFormatException e2) + catch (IllegalArgumentException e) { - throw new ConfigurationException("Replication factor must be numeric; found " + rf); + throw new ConfigurationException(e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 0a96f4c,5388dd6..31c60be --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@@ -128,8 -196,83 +128,8 @@@ public class StreamReceiveTask extends task.session.taskCompleted(task); return; } - cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right)); - hasCDC = cfs.metadata.params.cdc; - - Collection<SSTableReader> readers = task.sstables; - - try (Refs<SSTableReader> refs = Refs.ref(readers)) - { - /* - * We have a special path for views and for CDC. - * - * For views, since the view requires cleaning up any pre-existing state, we must put all partitions - * through the same write path as normal mutations. This also ensures any 2is are also updated. - * - * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they - * can be archived by the CDC process on discard. - */ - if (hasViews || hasCDC) - { - for (SSTableReader reader : readers) - { - Keyspace ks = Keyspace.open(reader.getKeyspaceName()); - try (ISSTableScanner scanner = reader.getScanner()) - { - while (scanner.hasNext()) - { - try (UnfilteredRowIterator rowIterator = scanner.next()) - { - Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata))); - - // MV *can* be applied unsafe if there's no CDC on the CFS as we flush below - // before transaction is done. - // - // If the CFS has CDC, however, these updates need to be written to the CommitLog - // so they get archived into the cdc_raw folder - ks.apply(m, hasCDC, true, false); - } - } - } - } - } - else - { - task.finishTransaction(); - - logger.debug("[Stream #{}] Received {} sstables from {} ({})", task.session.planId(), readers.size(), task.session.peer, readers); - // add sstables and build secondary indexes - cfs.addSSTables(readers); - cfs.indexManager.buildAllIndexesBlocking(readers); - - //invalidate row and counter cache - if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter()) - { - List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size()); - readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()))); - Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate); - task.receiver.finished();; - if (cfs.isRowCacheEnabled()) - { - int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds); - if (invalidatedKeys > 0) - logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " + - "receive task completed.", task.session.planId(), invalidatedKeys, - cfs.keyspace.getName(), cfs.getTableName()); - } - - if (cfs.metadata.isCounter()) - { - int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); - if (invalidatedKeys > 0) - logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " + - "receive task completed.", task.session.planId(), invalidatedKeys, - cfs.keyspace.getName(), cfs.getTableName()); - } - } - } - } ++ task.receiver.finished(); task.session.taskCompleted(task); } catch (Throwable t) http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org