[ 
https://issues.apache.org/jira/browse/CASSANDRA-18773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755832#comment-17755832
 ] 

Cameron Zemek commented on CASSANDRA-18773:
-------------------------------------------

Yes this is not limited to just major compactions. That was just a way I could 
reproduce the issue reliably. Same thing happening with switching from STCS to 
LCS for a customer, that operation has been going for 2 weeks now. It has 1.5Tb 
of disk usage. Disk benchmarks show the disk able todo 120Mb/s with random 
reads of 16kb chunks. So the operation should have completed in a day.

Picking random node, it has 5 compactions going with compactionthroughput set 
to 64Mb/s. iotop shows max of 26Mb/s.

I commented out a bunch of code in the hot paths:

 
{code:java}
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 2eb5d8fde7..bd72117632 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -532,7 +532,7 @@ public abstract class UnfilteredRowIterators
         public void close()
         {
             // This will close the input iterators
-            FileUtils.closeQuietly(mergeIterator);
+//            FileUtils.closeQuietly(mergeIterator);             if (listener 
!= null)
                 listener.close();
diff --git a/src/java/org/apache/cassandra/utils/MergeIterator.java 
b/src/java/org/apache/cassandra/utils/MergeIterator.java
index 6713dd0a43..5744dfb89b 100644
--- a/src/java/org/apache/cassandra/utils/MergeIterator.java
+++ b/src/java/org/apache/cassandra/utils/MergeIterator.java
@@ -42,7 +42,13 @@ public abstract class MergeIterator<In,Out> extends 
AbstractIterator<Out> implem
                  ? new TrivialOneToOne<>(sources, reducer)
                  : new OneToOne<>(sources, reducer);
         }
-        return new ManyToOne<>(sources, comparator, reducer);
+        ArrayList<Iterator<In>> filtered = new ArrayList<>(sources.size());
+        for (Iterator<In> it : sources) {
+            if (it != null) {
+                filtered.add(it);
+            }
+        }
+        return new ManyToOne<>(filtered, comparator, reducer);
     }     public Iterable<? extends Iterator<In>> iterators()
@@ -361,7 +367,8 @@ public abstract class MergeIterator<In,Out> extends 
AbstractIterator<Out> implem
             this.iter = iter;
             this.comp = comp;
             this.idx = idx;
-            this.lowerBound = iter instanceof IteratorWithLowerBound ? 
((IteratorWithLowerBound<In>)iter).lowerBound() : null;
+            this.lowerBound = null;
+//            this.lowerBound = iter instanceof IteratorWithLowerBound ? 
((IteratorWithLowerBound<In>)iter).lowerBound() : null;
         }         /** @return this if our iterator had an item, and it is now 
available, otherwise null */ {code}
still spending a significant chunk of time in UnfilteredRowMergeIterator, with 
bulk of that in ManyToOne constructor.

 

Is there not a way could manage the sstable merging without creating so many 
objects like ManyToOne? Eg. have a state object for each sstable and use that 
throughout the whole compaction to manage the merging. This is what 
cassandra-sstable-tools does. It keeps the current partition key for each 
sstable and has all the sstables in Priority queue (readerQueue). Eg.

 
{code:java}
ArrayList<SSTableReader> toMerge = new ArrayList(readerQueue.size());

while (!readerQueue.isEmpty()) {
    SSTableReader reader = this.readerQueue.remove();
    toMerge.add(reader);
    DecoratedKey key = reader.key;
    while ((reader = readerQueue.peek()) != null && reader.key.equals(key)) {
        readerQueue.remove();
        toMerge.add(reader);
    }
    doMerge(toMerge);
    for (Reader r : toMerge) {
        readerNext(reader); // advance the reader and add back to priority 
queue if more.
    }
    toMerge.clear();
}{code}
 

That is each sstable reader is positioned ready to read its current partition. 
Grab all readers that belong to the partiton to be merged. doMerge would 
iterate the rows in the readers and perform the merging. Then readerNext would 
read the next partition key and put it back into the priority queue. Doesn't 
have to be priority queue, just some efficient way to determine which sstables 
to include in the partition merge.

> Compactions are slow
> --------------------
>
>                 Key: CASSANDRA-18773
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-18773
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Local/Compaction
>            Reporter: Cameron Zemek
>            Priority: Normal
>             Fix For: 4.0.x, 4.1.x, 5.0.x, 5.x
>
>         Attachments: flamegraph.png, stress.yaml
>
>
> I have noticed that compactions involving a lot of sstables are very slow 
> (for example major compactions). I have attached a cassandra stress profile 
> that can generate such a dataset under ccm. In my local test I have 2567 
> sstables at 4Mb each.
> I added code to track wall clock time of various parts of the code. One 
> problematic part is ManyToOne constructor. Tracing through the code for every 
> partition creating a ManyToOne for all the sstable iterators for each 
> partition. In my local test get a measy 60Kb/sec read speed, and bottlenecked 
> on single core CPU (since this code is single threaded) with it spending 85% 
> of the wall clock time in ManyToOne constructor.
> As another datapoint to show its the merge iterator part of the code using 
> the cfstats from [https://github.com/instaclustr/cassandra-sstable-tools/] 
> which reads all the sstables but does no merging gets 26Mb/sec read speed.
> Tracking back from ManyToOne call I see this in 
> UnfilteredPartitionIterators::merge
> {code:java}
>                 for (int i = 0; i < toMerge.size(); i++)
>                 {
>                     if (toMerge.get(i) == null)
>                     {
>                         if (null == empty)
>                             empty = EmptyIterators.unfilteredRow(metadata, 
> partitionKey, isReverseOrder);
>                         toMerge.set(i, empty);
>                     }
>                 }
>  {code}
> Not sure what purpose of creating these empty rows are. But on a whim I 
> removed all these empty iterators before passing to ManyToOne and then all 
> the wall clock time shifted to CompactionIterator::hasNext() and read speed 
> increased to 1.5Mb/s.
> So there are further bottlenecks in this code path it seems, but the first is 
> this ManyToOne and having to build it for every partition read.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to