Repository: cassandra
Updated Branches:
  refs/heads/trunk 030c775ee -> 4b27287cd


Add extension points in storage and streaming classes

Patch by Blake Eggleston; reviewed by marcuse for CASSANDRA-11173


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4b27287c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4b27287c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4b27287c

Branch: refs/heads/trunk
Commit: 4b27287cd93088148d85d1a6ec9df34601f0c741
Parents: 030c775
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Tue Feb 16 15:06:00 2016 -0800
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Tue Feb 23 15:45:07 2016 +0100

----------------------------------------------------------------------
 .../apache/cassandra/db/ColumnFamilyStore.java  |  1 +
 .../db/SinglePartitionReadCommand.java          | 28 ++++---
 .../org/apache/cassandra/db/StorageHook.java    | 86 ++++++++++++++++++++
 .../apache/cassandra/streaming/StreamHook.java  | 57 +++++++++++++
 .../cassandra/streaming/StreamReader.java       |  4 +-
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  3 +-
 7 files changed, 165 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9b113c4..fa95063 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1217,6 +1217,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         DecoratedKey key = update.partitionKey();
         invalidateCachedPartition(key);
         metric.samplers.get(Sampler.WRITES).addSample(key.getKey(), 
key.hashCode(), 1);
+        StorageHook.instance.reportWrite(metadata.cfId, update);
         metric.writeLatency.addNano(System.nanoTime() - start);
         if(timeDelta < Long.MAX_VALUE)
             metric.colUpdateTimeDeltaHistogram.update(timeDelta);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 1a0b400..9712497 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -547,7 +547,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
                 @SuppressWarnings("resource") // 'iter' is added to iterators 
which is closed on exception,
                                               // or through the closing of the 
final merged iterator
-                UnfilteredRowIteratorWithLowerBound iter = 
makeIterator(sstable, true);
+                UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, 
sstable, true);
                 if (!sstable.isRepaired())
                     oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 
@@ -567,7 +567,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
                     @SuppressWarnings("resource") // 'iter' is added to 
iterators which is close on exception,
                                                   // or through the closing of 
the final merged iterator
-                    UnfilteredRowIteratorWithLowerBound iter = 
makeIterator(sstable, false);
+                    UnfilteredRowIteratorWithLowerBound iter = 
makeIterator(cfs, sstable, false);
                     if (!sstable.isRepaired())
                         oldestUnrepairedTombstone = 
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 
@@ -582,6 +582,7 @@ public class SinglePartitionReadCommand extends ReadCommand
             if (iterators.isEmpty())
                 return EmptyIterators.unfilteredRow(cfs.metadata, 
partitionKey(), filter.isReversed());
 
+            StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey());
             return withStateTracking(withSSTablesIterated(iterators, 
cfs.metric));
         }
         catch (RuntimeException | Error e)
@@ -609,15 +610,17 @@ public class SinglePartitionReadCommand extends 
ReadCommand
         return clusteringIndexFilter().shouldInclude(sstable);
     }
 
-    private UnfilteredRowIteratorWithLowerBound makeIterator(final 
SSTableReader sstable, boolean applyThriftTransformation)
+    private UnfilteredRowIteratorWithLowerBound makeIterator(ColumnFamilyStore 
cfs, final SSTableReader sstable, boolean applyThriftTransformation)
     {
-        return new UnfilteredRowIteratorWithLowerBound(partitionKey(),
-                                                       sstable,
-                                                       clusteringIndexFilter(),
-                                                       columnFilter(),
-                                                       isForThrift(),
-                                                       nowInSec(),
-                                                       
applyThriftTransformation);
+        return StorageHook.instance.makeRowIteratorWithLowerBound(cfs,
+                                                                  
partitionKey(),
+                                                                  sstable,
+                                                                  
clusteringIndexFilter(),
+                                                                  
columnFilter(),
+                                                                  
isForThrift(),
+                                                                  nowInSec(),
+                                                                  
applyThriftTransformation);
+
     }
 
     /**
@@ -724,7 +727,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
                 // We need to get the partition deletion and include it if 
it's live. In any case though, we're done with that sstable.
                 sstable.incrementReadCount();
-                try (UnfilteredRowIterator iter = 
sstable.iterator(partitionKey(), Slices.ALL, columnFilter(), 
filter.isReversed(), isForThrift()))
+                try (UnfilteredRowIterator iter = 
StorageHook.instance.makeRowIterator(cfs, sstable, partitionKey(), Slices.ALL, 
columnFilter(), filter.isReversed(), isForThrift()))
                 {
                     if (iter.partitionLevelDeletion().isLive())
                     {
@@ -737,7 +740,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
             Tracing.trace("Merging data from sstable {}", 
sstable.descriptor.generation);
             sstable.incrementReadCount();
-            try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), 
filter.getSlices(metadata()), columnFilter(), filter.isReversed(), 
isForThrift()))
+            try (UnfilteredRowIterator iter = 
StorageHook.instance.makeRowIterator(cfs, sstable, partitionKey(), 
filter.getSlices(metadata()), columnFilter(), filter.isReversed(), 
isForThrift()))
             {
                 if (iter.isEmpty())
                     continue;
@@ -754,6 +757,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
         DecoratedKey key = result.partitionKey();
         
cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), 
key.hashCode(), 1);
+        StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey());
 
         // "hoist up" the requested data into a more recent sstable
         if (sstablesIterated > cfs.getMinimumCompactionThreshold()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/db/StorageHook.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/StorageHook.java 
b/src/java/org/apache/cassandra/db/StorageHook.java
new file mode 100644
index 0000000..0f27adb
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/StorageHook.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.UUID;
+
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIteratorWithLowerBound;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
+
+public interface StorageHook
+{
+    public static final StorageHook instance = createHook();
+
+    public void reportWrite(UUID cfid, PartitionUpdate partitionUpdate);
+    public void reportRead(UUID cfid, DecoratedKey key);
+    public UnfilteredRowIteratorWithLowerBound 
makeRowIteratorWithLowerBound(ColumnFamilyStore cfs,
+                                                                      
DecoratedKey partitionKey,
+                                                                      
SSTableReader sstable,
+                                                                      
ClusteringIndexFilter filter,
+                                                                      
ColumnFilter selectedColumns,
+                                                                      boolean 
isForThrift,
+                                                                      int 
nowInSec,
+                                                                      boolean 
applyThriftTransformation);
+    public UnfilteredRowIterator makeRowIterator(ColumnFamilyStore cfs,
+                                                 SSTableReader sstable,
+                                                 DecoratedKey key,
+                                                 Slices slices,
+                                                 ColumnFilter selectedColumns,
+                                                 boolean reversed,
+                                                 boolean isForThrift);
+
+    static StorageHook createHook()
+    {
+        String className =  System.getProperty("cassandra.storage_hook");
+        if (className != null)
+        {
+            return FBUtilities.construct(className, 
StorageHook.class.getSimpleName());
+        }
+        else
+        {
+            return new StorageHook()
+            {
+                public void reportWrite(UUID cfid, PartitionUpdate 
partitionUpdate) {}
+
+                public void reportRead(UUID cfid, DecoratedKey key) {}
+
+                public UnfilteredRowIteratorWithLowerBound 
makeRowIteratorWithLowerBound(ColumnFamilyStore cfs, DecoratedKey partitionKey, 
SSTableReader sstable, ClusteringIndexFilter filter, ColumnFilter 
selectedColumns, boolean isForThrift, int nowInSec, boolean 
applyThriftTransformation)
+                {
+                    return new 
UnfilteredRowIteratorWithLowerBound(partitionKey,
+                                                                   sstable,
+                                                                   filter,
+                                                                   
selectedColumns,
+                                                                   isForThrift,
+                                                                   nowInSec,
+                                                                   
applyThriftTransformation);
+                }
+
+                public UnfilteredRowIterator makeRowIterator(ColumnFamilyStore 
cfs, SSTableReader sstable, DecoratedKey key, Slices slices, ColumnFilter 
selectedColumns, boolean reversed, boolean isForThrift)
+                {
+                    return sstable.iterator(key, slices, selectedColumns, 
reversed, isForThrift);
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/streaming/StreamHook.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamHook.java 
b/src/java/org/apache/cassandra/streaming/StreamHook.java
new file mode 100644
index 0000000..d610297
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamHook.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
+import org.apache.cassandra.utils.FBUtilities;
+
+public interface StreamHook
+{
+    public static final StreamHook instance = createHook();
+
+    public OutgoingFileMessage reportOutgoingFile(StreamSession session, 
SSTableReader sstable, OutgoingFileMessage message);
+    public void reportStreamFuture(StreamSession session, StreamResultFuture 
future);
+    public void reportIncomingFile(ColumnFamilyStore cfs, SSTableMultiWriter 
writer, StreamSession session, int sequenceNumber);
+
+    static StreamHook createHook()
+    {
+        String className =  System.getProperty("cassandra.stream_hook");
+        if (className != null)
+        {
+            return FBUtilities.construct(className, 
StreamHook.class.getSimpleName());
+        }
+        else
+        {
+            return new StreamHook()
+            {
+                public OutgoingFileMessage reportOutgoingFile(StreamSession 
session, SSTableReader sstable, OutgoingFileMessage message)
+                {
+                    return message;
+                }
+
+                public void reportStreamFuture(StreamSession session, 
StreamResultFuture future) {}
+
+                public void reportIncomingFile(ColumnFamilyStore cfs, 
SSTableMultiWriter writer, StreamSession session, int sequenceNumber) {}
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java 
b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 838664d..7348027 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -143,7 +143,9 @@ public class StreamReader
         if (localDir == null)
             throw new IOException("Insufficient disk space to store " + 
totalSize + " bytes");
 
-        return new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, 
format, sstableLevel, totalSize, session.getTransaction(cfId), header);
+        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, 
estimatedKeys, repairedAt, format, sstableLevel, totalSize, 
session.getTransaction(cfId), header);
+        StreamHook.instance.reportIncomingFile(cfs, writer, session, 
fileSeqNum);
+        return writer;
     }
 
     protected void drain(InputStream dis, long bytesRead) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 9078acc..67e6b84 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -39,8 +39,6 @@ import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.metrics.StreamingMetrics;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.messages.*;
@@ -222,6 +220,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     public void init(StreamResultFuture streamResult)
     {
         this.streamResult = streamResult;
+        StreamHook.instance.reportStreamFuture(this, streamResult);
     }
 
     public void start()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b27287c/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java 
b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index f14abd2..e8d0cae 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -23,14 +23,12 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.concurrent.RefCounted;
 
 /**
  * StreamTransferTask sends sections of SSTable files in certain ColumnFamily.
@@ -56,6 +54,7 @@ public class StreamTransferTask extends StreamTask
     {
         assert ref.get() != null && cfId.equals(ref.get().metadata.cfId);
         OutgoingFileMessage message = new OutgoingFileMessage(ref, 
sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, 
session.keepSSTableLevel());
+        message = StreamHook.instance.reportOutgoingFile(session, ref.get(), 
message);
         files.put(message.header.sequenceNumber, message);
         totalSize += message.header.size();
     }

Reply via email to