Repository: cassandra Updated Branches: refs/heads/trunk 030c775ee -> 4b27287cd
Add extension points in storage and streaming classes Patch by Blake Eggleston; reviewed by marcuse for CASSANDRA-11173 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4b27287c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4b27287c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4b27287c Branch: refs/heads/trunk Commit: 4b27287cd93088148d85d1a6ec9df34601f0c741 Parents: 030c775 Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Tue Feb 16 15:06:00 2016 -0800 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Feb 23 15:45:07 2016 +0100 ---------------------------------------------------------------------- .../apache/cassandra/db/ColumnFamilyStore.java | 1 + .../db/SinglePartitionReadCommand.java | 28 ++++--- .../org/apache/cassandra/db/StorageHook.java | 86 ++++++++++++++++++++ .../apache/cassandra/streaming/StreamHook.java | 57 +++++++++++++ .../cassandra/streaming/StreamReader.java | 4 +- .../cassandra/streaming/StreamSession.java | 3 +- .../cassandra/streaming/StreamTransferTask.java | 3 +- 7 files changed, 165 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 9b113c4..fa95063 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1217,6 +1217,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean DecoratedKey key = update.partitionKey(); invalidateCachedPartition(key); metric.samplers.get(Sampler.WRITES).addSample(key.getKey(), key.hashCode(), 1); + StorageHook.instance.reportWrite(metadata.cfId, update); metric.writeLatency.addNano(System.nanoTime() - start); if(timeDelta < Long.MAX_VALUE) metric.colUpdateTimeDeltaHistogram.update(timeDelta); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 1a0b400..9712497 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -547,7 +547,7 @@ public class SinglePartitionReadCommand extends ReadCommand @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, // or through the closing of the final merged iterator - UnfilteredRowIteratorWithLowerBound iter = makeIterator(sstable, true); + UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, true); if (!sstable.isRepaired()) oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); @@ -567,7 +567,7 @@ public class SinglePartitionReadCommand extends ReadCommand @SuppressWarnings("resource") // 'iter' is added to iterators which is close on exception, // or through the closing of the final merged iterator - UnfilteredRowIteratorWithLowerBound iter = makeIterator(sstable, false); + UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, false); if (!sstable.isRepaired()) oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); @@ -582,6 +582,7 @@ public class SinglePartitionReadCommand extends ReadCommand if (iterators.isEmpty()) return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed()); + StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey()); return withStateTracking(withSSTablesIterated(iterators, cfs.metric)); } catch (RuntimeException | Error e) @@ -609,15 +610,17 @@ public class SinglePartitionReadCommand extends ReadCommand return clusteringIndexFilter().shouldInclude(sstable); } - private UnfilteredRowIteratorWithLowerBound makeIterator(final SSTableReader sstable, boolean applyThriftTransformation) + private UnfilteredRowIteratorWithLowerBound makeIterator(ColumnFamilyStore cfs, final SSTableReader sstable, boolean applyThriftTransformation) { - return new UnfilteredRowIteratorWithLowerBound(partitionKey(), - sstable, - clusteringIndexFilter(), - columnFilter(), - isForThrift(), - nowInSec(), - applyThriftTransformation); + return StorageHook.instance.makeRowIteratorWithLowerBound(cfs, + partitionKey(), + sstable, + clusteringIndexFilter(), + columnFilter(), + isForThrift(), + nowInSec(), + applyThriftTransformation); + } /** @@ -724,7 +727,7 @@ public class SinglePartitionReadCommand extends ReadCommand // We need to get the partition deletion and include it if it's live. In any case though, we're done with that sstable. sstable.incrementReadCount(); - try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), Slices.ALL, columnFilter(), filter.isReversed(), isForThrift())) + try (UnfilteredRowIterator iter = StorageHook.instance.makeRowIterator(cfs, sstable, partitionKey(), Slices.ALL, columnFilter(), filter.isReversed(), isForThrift())) { if (iter.partitionLevelDeletion().isLive()) { @@ -737,7 +740,7 @@ public class SinglePartitionReadCommand extends ReadCommand Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation); sstable.incrementReadCount(); - try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift())) + try (UnfilteredRowIterator iter = StorageHook.instance.makeRowIterator(cfs, sstable, partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift())) { if (iter.isEmpty()) continue; @@ -754,6 +757,7 @@ public class SinglePartitionReadCommand extends ReadCommand DecoratedKey key = result.partitionKey(); cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1); + StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey()); // "hoist up" the requested data into a more recent sstable if (sstablesIterated > cfs.getMinimumCompactionThreshold() http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/db/StorageHook.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/StorageHook.java b/src/java/org/apache/cassandra/db/StorageHook.java new file mode 100644 index 0000000..0f27adb --- /dev/null +++ b/src/java/org/apache/cassandra/db/StorageHook.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.util.UUID; + +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIteratorWithLowerBound; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.FBUtilities; + +public interface StorageHook +{ + public static final StorageHook instance = createHook(); + + public void reportWrite(UUID cfid, PartitionUpdate partitionUpdate); + public void reportRead(UUID cfid, DecoratedKey key); + public UnfilteredRowIteratorWithLowerBound makeRowIteratorWithLowerBound(ColumnFamilyStore cfs, + DecoratedKey partitionKey, + SSTableReader sstable, + ClusteringIndexFilter filter, + ColumnFilter selectedColumns, + boolean isForThrift, + int nowInSec, + boolean applyThriftTransformation); + public UnfilteredRowIterator makeRowIterator(ColumnFamilyStore cfs, + SSTableReader sstable, + DecoratedKey key, + Slices slices, + ColumnFilter selectedColumns, + boolean reversed, + boolean isForThrift); + + static StorageHook createHook() + { + String className = System.getProperty("cassandra.storage_hook"); + if (className != null) + { + return FBUtilities.construct(className, StorageHook.class.getSimpleName()); + } + else + { + return new StorageHook() + { + public void reportWrite(UUID cfid, PartitionUpdate partitionUpdate) {} + + public void reportRead(UUID cfid, DecoratedKey key) {} + + public UnfilteredRowIteratorWithLowerBound makeRowIteratorWithLowerBound(ColumnFamilyStore cfs, DecoratedKey partitionKey, SSTableReader sstable, ClusteringIndexFilter filter, ColumnFilter selectedColumns, boolean isForThrift, int nowInSec, boolean applyThriftTransformation) + { + return new UnfilteredRowIteratorWithLowerBound(partitionKey, + sstable, + filter, + selectedColumns, + isForThrift, + nowInSec, + applyThriftTransformation); + } + + public UnfilteredRowIterator makeRowIterator(ColumnFamilyStore cfs, SSTableReader sstable, DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift) + { + return sstable.iterator(key, slices, selectedColumns, reversed, isForThrift); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/streaming/StreamHook.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamHook.java b/src/java/org/apache/cassandra/streaming/StreamHook.java new file mode 100644 index 0000000..d610297 --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/StreamHook.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.streaming.messages.OutgoingFileMessage; +import org.apache.cassandra.utils.FBUtilities; + +public interface StreamHook +{ + public static final StreamHook instance = createHook(); + + public OutgoingFileMessage reportOutgoingFile(StreamSession session, SSTableReader sstable, OutgoingFileMessage message); + public void reportStreamFuture(StreamSession session, StreamResultFuture future); + public void reportIncomingFile(ColumnFamilyStore cfs, SSTableMultiWriter writer, StreamSession session, int sequenceNumber); + + static StreamHook createHook() + { + String className = System.getProperty("cassandra.stream_hook"); + if (className != null) + { + return FBUtilities.construct(className, StreamHook.class.getSimpleName()); + } + else + { + return new StreamHook() + { + public OutgoingFileMessage reportOutgoingFile(StreamSession session, SSTableReader sstable, OutgoingFileMessage message) + { + return message; + } + + public void reportStreamFuture(StreamSession session, StreamResultFuture future) {} + + public void reportIncomingFile(ColumnFamilyStore cfs, SSTableMultiWriter writer, StreamSession session, int sequenceNumber) {} + }; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 838664d..7348027 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -143,7 +143,9 @@ public class StreamReader if (localDir == null) throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); - return new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), header); + RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), header); + StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum); + return writer; } protected void drain(InputStream dis, long bytesRead) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 9078acc..67e6b84 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -39,8 +39,6 @@ import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; -import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.metrics.StreamingMetrics; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.messages.*; @@ -222,6 +220,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber public void init(StreamResultFuture streamResult) { this.streamResult = streamResult; + StreamHook.instance.reportStreamFuture(this, streamResult); } public void start() http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index f14abd2..e8d0cae 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -23,14 +23,12 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.streaming.messages.OutgoingFileMessage; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Ref; -import org.apache.cassandra.utils.concurrent.RefCounted; /** * StreamTransferTask sends sections of SSTable files in certain ColumnFamily. @@ -56,6 +54,7 @@ public class StreamTransferTask extends StreamTask { assert ref.get() != null && cfId.equals(ref.get().metadata.cfId); OutgoingFileMessage message = new OutgoingFileMessage(ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel()); + message = StreamHook.instance.reportOutgoingFile(session, ref.get(), message); files.put(message.header.sequenceNumber, message); totalSize += message.header.size(); }