[ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13076246#comment-13076246 ]
Sylvain Lebresne commented on CASSANDRA-2901: --------------------------------------------- bq. My thinking was that allowing per-deserializer buffers will keep the pipeline full better. Make sense. bq. It's not so bad, because we can assume N >= 2 sstables, and we restrict each deserializer to 1/N of in-memory limit. So I think we come close to 2x overall. (And if we don't, I'd rather adjust our estimate, than make it less performant/more complex.) I still think that this pipeline is a bit too deep. I agree that each deserializer has 1/N in-memory limit, but that means that for a given row (a given key if you prefer), we have up to in_memory_limit worth of data in memory (since we have N sstables). And the number of such rows that can be in memory at a given time is: # 2 * mem_limit for each deserializer (the row in the queue and the one being deserialized) # mem_limit for each each MergeTask in memory. Given that the MergeTask executor has nb_processors threads and a nb_processors queue, this means up to 2 * nb_processors * mem_limit. # if we want to be exact, the reducer thread can also hold on MergeTask while it is blocked on submitting to the executor. That is, we can have up to a (2 * nb_processors + 3) blowup. On a 8 or 16 cores, we're far from the 2x. Now I understand the willingness to keep the pipeline full, and that in general we shouldn't be too close of this theoretical limit, but I do think that as is, it's too easy to OOM, or to force people to use a very low in-memory limit which would make this less useful than it should. I'm also no proposing to complicate things. What I would do is using direct hand-off for the merge task executor and to update the maxInMemory limit we give to each deserializer. We could do only the limit update, but we would then need to put it even lower and I'm not sure it would be a good trade-off. Other comments: * We feed NotifyingSSTableIdentityIterator to LazilyCompactedRow. However, in LCR.getEstimatedColumnCount(), we won't report the right count because NSSTII is not an instance of SSTII. Same thing in LCR.iterator, we won't call reset correctly on the wrapped SSTII (imho we should add getColumnCount and reset to the IColumnIterator interface (or make a sub-interface with those) because that 'if...instanceof' business is a bit error prone/ugly). * We could maybe say how multithreaded_compaction is different from concurrent_compactors and that multithread_compaction is likely only useful for SSDs in cassandra.yaml ? * The bytesRead "race" should also be fixed in CompactionIterable and the 'let's use a static final comparator' stands there too. But maybe we should fix that elsewhere. * I would have put the code in CompactedRow.close() at the end of the LCR.write() instead of adding a new method, as it avoids forgetting calling close and I don't see a good reason why close would need to be separated. * We can make PreCompactedRow.removeDeletedAndOldShards a public method and use it in PCI.MergeTask. > Allow taking advantage of multiple cores while compacting a single CF > --------------------------------------------------------------------- > > Key: CASSANDRA-2901 > URL: https://issues.apache.org/jira/browse/CASSANDRA-2901 > Project: Cassandra > Issue Type: Improvement > Components: Core > Reporter: Jonathan Ellis > Assignee: Jonathan Ellis > Priority: Minor > Fix For: 0.8.3 > > Attachments: 2901-v2.txt, 2901-v3.txt, 2901.patch > > > Moved from CASSANDRA-1876: > There are five stages: read, deserialize, merge, serialize, and write. We > probably want to continue doing read+deserialize and serialize+write > together, or you waste a lot copying to/from buffers. > So, what I would suggest is: one thread per input sstable doing read + > deserialize (a row at a time). A thread pool (one per core?) merging > corresponding rows from each input sstable. One thread doing serialize + > writing the output (this has to wait for the merge threads to complete > in-order, obviously). This should take us from being CPU bound on SSDs (since > only one core is compacting) to being I/O bound. > This will require roughly 2x the memory, to allow the reader threads to work > ahead of the merge stage. (I.e. for each input sstable you will have up to > one row in a queue waiting to be merged, and the reader thread working on the > next.) Seems quite reasonable on that front. You'll also want a small queue > size for the serialize-merged-rows executor. > Multithreaded compaction should be either on or off. It doesn't make sense to > try to do things halfway (by doing the reads with a > threadpool whose size you can grow/shrink, for instance): we still have > compaction threads tuned to low priority, by default, so the impact on the > rest of the system won't be very different. Nor do we expect to have so many > input sstables that we lose a lot in context switching between reader threads. > IMO it's acceptable to punt completely on rows that are larger than memory, > and fall back to the old non-parallel code there. I don't see any sane way to > parallelize large-row compactions. -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira