[ 
https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13076246#comment-13076246
 ] 

Sylvain Lebresne commented on CASSANDRA-2901:
---------------------------------------------

bq. My thinking was that allowing per-deserializer buffers will keep the 
pipeline full better.

Make sense.

bq. It's not so bad, because we can assume N >= 2 sstables, and we restrict 
each deserializer to 1/N of in-memory limit. So I think we come close to 2x 
overall. (And if we don't, I'd rather adjust our estimate, than make it less 
performant/more complex.)

I still think that this pipeline is a bit too deep. I agree that each 
deserializer has 1/N in-memory limit, but that means that for a given row (a 
given key if you prefer), we have up to in_memory_limit worth of data in memory 
(since we have N sstables). And the number of such rows that can be in memory 
at a given time is:
  # 2 * mem_limit for each deserializer (the row in the queue and the one being 
deserialized)
  # mem_limit for each each MergeTask in memory. Given that the MergeTask 
executor has nb_processors threads and a nb_processors queue, this means up to 
2 * nb_processors * mem_limit.
  # if we want to be exact, the reducer thread can also hold on MergeTask while 
it is blocked on submitting to the executor.
That is, we can have up to a (2 * nb_processors + 3) blowup. On a 8 or 16 
cores, we're far from the 2x.

Now I understand the willingness to keep the pipeline full, and that in general 
we shouldn't be too close of this theoretical limit, but I do think that as is, 
it's too easy to OOM, or to force people to use a very low in-memory limit 
which would make this less useful than it should.

I'm also no proposing to complicate things. What I would do is using direct 
hand-off for the merge task executor and to update the maxInMemory limit we 
give to each deserializer. We could do only the limit update, but we would then 
need to put it even lower and I'm not sure it would be a good trade-off.

Other comments:
* We feed NotifyingSSTableIdentityIterator to LazilyCompactedRow. However, in 
LCR.getEstimatedColumnCount(), we won't report the right count because NSSTII 
is not an instance of SSTII. Same thing in LCR.iterator, we won't call reset 
correctly on the wrapped SSTII (imho we should add getColumnCount and reset to 
the IColumnIterator interface (or make a sub-interface with those) because that 
'if...instanceof' business is a bit error prone/ugly).
* We could maybe say how multithreaded_compaction is different from 
concurrent_compactors and that multithread_compaction is likely only useful for 
SSDs in cassandra.yaml ?
* The bytesRead "race" should also be fixed in CompactionIterable and the 
'let's use a static final comparator' stands there too. But maybe we should fix 
that elsewhere.
* I would have put the code in CompactedRow.close() at the end of the 
LCR.write() instead of adding a new method, as it avoids forgetting calling 
close and I don't see a good reason why close would need to be separated.
* We can make PreCompactedRow.removeDeletedAndOldShards a public method and use 
it in PCI.MergeTask.


> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901-v2.txt, 2901-v3.txt, 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We 
> probably want to continue doing read+deserialize and serialize+write 
> together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + 
> deserialize (a row at a time). A thread pool (one per core?) merging 
> corresponding rows from each input sstable. One thread doing serialize + 
> writing the output (this has to wait for the merge threads to complete 
> in-order, obviously). This should take us from being CPU bound on SSDs (since 
> only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work 
> ahead of the merge stage. (I.e. for each input sstable you will have up to 
> one row in a queue waiting to be merged, and the reader thread working on the 
> next.) Seems quite reasonable on that front.  You'll also want a small queue 
> size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to 
> try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have 
> compaction threads tuned to low priority, by default, so the impact on the 
> rest of the system won't be very different. Nor do we expect to have so many 
> input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, 
> and fall back to the old non-parallel code there. I don't see any sane way to 
> parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to