[ 
https://issues.apache.org/jira/browse/CASSANDRA-6602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14158131#comment-14158131
 ] 

Björn Hegerfors commented on CASSANDRA-6602:
--------------------------------------------

OK, so a writeup on some implementation details of this strategy might be in 
place. Maybe there's a better place to put something as long as this.

But first, thanks for the patch Marcus! Everything looks good.

Now, Marcus mentioned worried about sliding windows as far as SSTable candidate 
selection goes. Or at least that's what it sounded like. That was indeed one of 
my main concerns when I started implementing DTCS. After all, time is 
constantly moving, so how do we avoid this situation where some SSTables 
compact and then immediately slide over into some "SSTables older than x" time 
window and need to compact again. Or maybe before the first compaction, a few 
of the oldest of those SSTables managed to slip over already, causing the 
window for bigger SSTables to reach more than min_compaction_threshold and 
triggering a compaction with mixed size SSTables. Or something else like that.

It can easily get messy, so I aimed for an algorithm with as little of a 
"sliding windows" effect as possible. What I came up with is really all summed 
up by the Target class. A target represents a time span between Unix epoch* and 
now. An SSTable is "on target" if its <b>min</b> timestamp is within a given 
target's time span.

This is the first decision that I would like some input on. My thinking was 
that if there's a big SSTable with timestamps ranging from old to new, picking 
<b>max</b> timestamp would've made this data participate in too frequent 
compactions, because SSTables considered new compact more often than those 
considered old. STCS creates these kinds of SSTables, if you run DTCS from the 
start, it won't matter what timestamp you chose as the representative timestamp 
of an SSTable. So choosing <b>min</b> timestamp was for improved compatibility 
with STCS, essentially.

How these targets are chosen is the most interesting part. I tend to think 
targets backwards in time. The first target is the one covering most recent 
timestamps. The most naive idea would be to let the first target be the last 
hour, the next target the hour before, and so on for 24 hours. Then target #25 
would be the day from 24 hours ago to 48 hours ago. 6 of those would get us a 
week back. Then the same thing for months and years, etc. This approach has a 
number of issues. First, it needs a concept of a calendar that I would rather 
not hard code as everybody won't want it that way. Plus, it would compact by 
very varying factors like 24, 7 and 4 (but a month is not exactly 4 weeks, 
except Fabruary, sometimes... ugh!). So I quickly ditched the calendar for a 
configurable factor, called "base", and an initial target size that I call 
"time_unit". You could set base to 3 and timeUnit to an hour for example, which 
should be expressed in microseconds (if microseconds is what your timestamps 
use. I recommend sticking with microseconds, which appears to be a CQL 
standard. Timestamp inconsistencies mess with DTCS, which I've involuntarily 
experienced in my tests). Then the calendar-less version of the above would 
pick last hour as the initial target and 2 more for the hours before that. Then 
before that, 2 3-hour targets, then 2 9-hour targets, etc. I then noticed that 
"base" mostly duplicated the role of min_compaction_threshold, so I removed it 
in favor of that. Any input on that choice?

The above solution is similar to my final algorithm, but if you noticed my 
wording, you'll see that the above suffers from the "sliding windows" issue. 
It's most probably a bad idea to use something like "last hour" or "between 18 
hours ago and 9 hours ago" as targets, since those would be moving targets. 
Some integer arithmetic fit perfectly as a way around this. Let "now" be the 
current time (I get it by taking the greatest max timestamp across all 
SSTables, any input on that?). Let's keep timeUnit at an hour and base 
(min_compaction_threshold) at 3. Using integer division, now/timeUnit will 
yield the same value for an hour. Now we can use that as our first target. 
Unless we recently passed an hour threshold, (now - 20 seconds)/timeUnit will 
have the same result. We can't simply use the old approach of making 2 
single-hour targets before that, etc. because that will just lead to the same 
sliding windows, but with considerably lower resolution (certainly better than 
before, though). Rather, consider what happens if you integer divide this 
initial target with base. Then we get a new target representing the 3 hour time 
span, perfectly aligned on 3 hour borders, that contains the current hour. The 
current hour could be either the 1st, 2nd or 3rd one of those hours (nothing 
in-between). You can continue like that, dividing by base again to get the 
perfectly aligned 9 hour time span containing the aforementioned 3 hour span in 
one of three slots.

This could be the sequence of targets to use. I chose not to use this, and this 
is again somewhere I could use a second opinion. Instead, I let the initial 
target t0 = now/timeUnit (integer division is implicit), just like before, but 
the next target is t1 = (t0/base) - 1. That is, the perfectly aligned 3 hour 
time span right before the one that t0 was in. Then t2 = (t1/base) - 1 and so 
on. With this change, new SSTables aren't candidates for all targets after the 
first one they "hit", as it would have been without this last change.

Now we're almost there. The first odd thing about this sequence of targets is 
that there are now gaps between targets. If t0 is not the first hour of a 
perfectly aligned 3 hour time span (in integer arithmetic terms: if (t0 % base 
> 0)), then the next target t1 (a 3 hour span), will not end right before t0. I 
thought it made sense to fill in those gaps. So in those cases, when (tn % base 
> 0), I did it in the simplest way, by letting the next target t(n+1) have the 
same size as the current target, but moved one step earlier. Essentially t(n+1) 
= tn - 1.

This is exactly the way DTCS is implemented right now. Whenever at least 
min_compaction_threshold SSTables hit the same target, they get submitted for 
compaction. If multiple targets succeed, then first (i.e. latest) one is 
picked. In the first patch that I submitted here, there was one thing that was 
done differently. Then the initial target was (now/timeUnit) - 1, meaning that 
the current hour was not included in any target. I'm no expert at which of 
these is best, but difference is that the approach of that patch let new 
SSTables queue up until we passed an hour (or whatever timeUnit is set to) 
border. Then they would all compact at once. The current implementation favors 
compacting and recompacting them as much as possible for the first hour (but 
not if they're less than min_compaction_threshold).

A side effect of this is that if min_compaction_threshold is, say 4, you might 
have 1 big and 2 small SSTables from the last hour, the moment time (more 
specifically, the "now" variable) crosses over to a new hour, then it will stay 
uncompacted like that until those SSTables enter a bigger target. At that 
point, what you would have expected to be 4 similarly sized SSTables in that 
new target would rather be 3 similarly sized ones, 1 a tad smaller, and 2 small 
ones (actually the same thing is likely to have happened during other hours 
too). But after that compaction happens, everything is like it should be**. I 
don't believe that it's a big deal. There are a couple ways around it. The 
(now/timeUnit) - 1 initial target approach fixes this. Another way would be to 
ignore min_compaction_threshold for anything beyond how I use it as a "base", 
or keeping base and min_compaction_threshold separate, letting you set 
min_compaction_threshold to 2. Does anyone have any ideas about this?

>From what I remember right now, the last tweak that I've been thinking about 
>is the following. Let's use our base=3, timeUnit=hour example from before. If 
>we have (t0 % base == 0), we know that we're at a 3 hour border. Right now 
>that means that the next target t1 = (t0/base) - 1. But what if we're actually 
>also at a 9 hour border? Then we could make the next target 9 times bigger and 
>put is as (t0/(base^2)) - 1 instead. But we might even be at a 27 hour border. 
>I hope you see where this is going? Using this knowledge, we could more 
>aggressively compact SSTables up to bigger sizes at an earlier point. That 
>tweak might be worth considering.

I'll end this detailed explanation with a simple visualization that might help 
in getting some intuition about how the targets "move". Here, base is 4 and we 
let time go from time 0 to 20. On each line, one timeUnit has passed, what you 
see is a list of targets and which timestamps they cover (after division by 
timeUnit).


Sliding windows approach:
[[0]]
[[0],[1]]
[[0],[1],[2]]
[[0],[1],[2],[3]]
[[0],[1],[2],[3],[4]]
[[0,1],[2],[3],[4],[5]]
[[0,1,2],[3],[4],[5],[6]]
[[0,1,2,3],[4],[5],[6],[7]]
[[0],[1,2,3,4],[5],[6],[7],[8]]
[[0,1],[2,3,4,5],[6],[7],[8],[9]]
[[0,1,2],[3,4,5,6],[7],[8],[9],[10]]
[[0,1,2,3],[4,5,6,7],[8],[9],[10],[11]]
[[0],[1,2,3,4],[5,6,7,8],[9],[10],[11],[12]]
[[0,1],[2,3,4,5],[6,7,8,9],[10],[11],[12],[13]]
[[0,1,2],[3,4,5,6],[7,8,9,10],[11],[12],[13],[14]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12],[13],[14],[15]]
[[0],[1,2,3,4],[5,6,7,8],[9,10,11,12],[13],[14],[15],[16]]
[[0,1],[2,3,4,5],[6,7,8,9],[10,11,12,13],[14],[15],[16],[17]]
[[0,1,2],[3,4,5,6],[7,8,9,10],[11,12,13,14],[15],[16],[17],[18]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12,13,14,15],[16],[17],[18],[19]]
[[0,1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16],[17],[18],[19],[20]]
[[0,1,2,3,4,5],[6,7,8,9],[10,11,12,13],[14,15,16,17],[18],[19],[20],[21]]
[[0,1,2,3,4,5,6],[7,8,9,10],[11,12,13,14],[15,16,17,18],[19],[20],[21],[22]]
[[0,1,2,3,4,5,6,7],[8,9,10,11],[12,13,14,15],[16,17,18,19],[20],[21],[22],[23]]
[[0,1,2,3,4,5,6,7,8],[9,10,11,12],[13,14,15,16],[17,18,19,20],[21],[22],[23],[24]]
[[0,1,2,3,4,5,6,7,8,9],[10,11,12,13],[14,15,16,17],[18,19,20,21],[22],[23],[24],[25]]
[[0,1,2,3,4,5,6,7,8,9,10],[11,12,13,14],[15,16,17,18],[19,20,21,22],[23],[24],[25],[26]]
[[0,1,2,3,4,5,6,7,8,9,10,11],[12,13,14,15],[16,17,18,19],[20,21,22,23],[24],[25],[26],[27]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12],[13,14,15,16],[17,18,19,20],[21,22,23,24],[25],[26],[27],[28]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13],[14,15,16,17],[18,19,20,21],[22,23,24,25],[26],[27],[28],[29]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14],[15,16,17,18],[19,20,21,22],[23,24,25,26],[27],[28],[29],[30]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24,25,26,27],[28],[29],[30],[31]]
[[0],[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16],[17,18,19,20],[21,22,23,24],[25,26,27,28],[29],[30],[31],[32]]


Current implementation:
[[0]]
[[0],[1]]
[[0],[1],[2]]
[[0],[1],[2],[3]]
[[0,1,2,3],[4]]
[[0,1,2,3],[4],[5]]
[[0,1,2,3],[4],[5],[6]]
[[0,1,2,3],[4],[5],[6],[7]]
[[0,1,2,3],[4,5,6,7],[8]]
[[0,1,2,3],[4,5,6,7],[8],[9]]
[[0,1,2,3],[4,5,6,7],[8],[9],[10]]
[[0,1,2,3],[4,5,6,7],[8],[9],[10],[11]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12],[13]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12],[13],[14]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12],[13],[14],[15]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12,13,14,15],[16]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12,13,14,15],[16],[17]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12,13,14,15],[16],[17],[18]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12,13,14,15],[16],[17],[18],[19]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20],[21]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20],[21],[22]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20],[21],[22],[23]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24],[25]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24],[25],[26]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24],[25],[26],[27]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24,25,26,27],[28]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24,25,26,27],[28],[29]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24,25,26,27],[28],[29],[30]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24,25,26,27],[28],[29],[30],[31]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24,25,26,27],[28,29,30,31],[32]]


More aggressive tweak:
[[0]]
[[0],[1]]
[[0],[1],[2]]
[[0],[1],[2],[3]]
[[0,1,2,3],[4]]
[[0,1,2,3],[4],[5]]
[[0,1,2,3],[4],[5],[6]]
[[0,1,2,3],[4],[5],[6],[7]]
[[0,1,2,3],[4,5,6,7],[8]]
[[0,1,2,3],[4,5,6,7],[8],[9]]
[[0,1,2,3],[4,5,6,7],[8],[9],[10]]
[[0,1,2,3],[4,5,6,7],[8],[9],[10],[11]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12],[13]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12],[13],[14]]
[[0,1,2,3],[4,5,6,7],[8,9,10,11],[12],[13],[14],[15]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16],[17]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16],[17],[18]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16],[17],[18],[19]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20],[21]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20],[21],[22]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20],[21],[22],[23]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24],[25]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24],[25],[26]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24],[25],[26],[27]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24,25,26,27],[28]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24,25,26,27],[28],[29]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24,25,26,27],[28],[29],[30]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19],[20,21,22,23],[24,25,26,27],[28],[29],[30],[31]]
[[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],[16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31],[32]]


Hope that enlightens more than it blinds! It would be great to hear some 
feedback on this. More might be said about the effects of using DTCS, what it's 
good for, etc. But that's another topic.

*  A possible extension to DTCS is to add an option (default = 0L) for a time 
that should be considered time 0, rather than the Unix epoch.
** Slight reservation, max_compaction_threshold should be at least 
min_compaction_threshold^2, to be absolutely sure.

> Compaction improvements to optimize time series data
> ----------------------------------------------------
>
>                 Key: CASSANDRA-6602
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-6602
>             Project: Cassandra
>          Issue Type: New Feature
>          Components: Core
>            Reporter: Tupshin Harper
>            Assignee: Björn Hegerfors
>              Labels: compaction, performance
>             Fix For: 3.0
>
>         Attachments: 1 week.txt, 8 weeks.txt, STCS 16 hours.txt, 
> TimestampViewer.java, 
> cassandra-2.0-CASSANDRA-6602-DateTieredCompactionStrategy.txt, 
> cassandra-2.0-CASSANDRA-6602-DateTieredCompactionStrategy_v2.txt, 
> cassandra-2.0-CASSANDRA-6602-DateTieredCompactionStrategy_v3.txt
>
>
> There are some unique characteristics of many/most time series use cases that 
> both provide challenges, as well as provide unique opportunities for 
> optimizations.
> One of the major challenges is in compaction. The existing compaction 
> strategies will tend to re-compact data on disk at least a few times over the 
> lifespan of each data point, greatly increasing the cpu and IO costs of that 
> write.
> Compaction exists to
> 1) ensure that there aren't too many files on disk
> 2) ensure that data that should be contiguous (part of the same partition) is 
> laid out contiguously
> 3) deleting data due to ttls or tombstones
> The special characteristics of time series data allow us to optimize away all 
> three.
> Time series data
> 1) tends to be delivered in time order, with relatively constrained exceptions
> 2) often has a pre-determined and fixed expiration date
> 3) Never gets deleted prior to TTL
> 4) Has relatively predictable ingestion rates
> Note that I filed CASSANDRA-5561 and this ticket potentially replaces or 
> lowers the need for it. In that ticket, jbellis reasonably asks, how that 
> compaction strategy is better than disabling compaction.
> Taking that to heart, here is a compaction-strategy-less approach that could 
> be extremely efficient for time-series use cases that follow the above 
> pattern.
> (For context, I'm thinking of an example use case involving lots of streams 
> of time-series data with a 5GB per day ingestion rate, and a 1000 day 
> retention with TTL, resulting in an eventual steady state of 5TB per node)
> 1) You have an extremely large memtable (preferably off heap, if/when doable) 
> for the table, and that memtable is sized to be able to hold a lengthy window 
> of time. A typical period might be one day. At the end of that period, you 
> flush the contents of the memtable to an sstable and move to the next one. 
> This is basically identical to current behaviour, but with thresholds 
> adjusted so that you can ensure flushing at predictable intervals. (Open 
> question is whether predictable intervals is actually necessary, or whether 
> just waiting until the huge memtable is nearly full is sufficient)
> 2) Combine the behaviour with CASSANDRA-5228 so that sstables will be 
> efficiently dropped once all of the columns have. (Another side note, it 
> might be valuable to have a modified version of CASSANDRA-3974 that doesn't 
> bother storing per-column TTL since it is required that all columns have the 
> same TTL)
> 3) Be able to mark column families as read/write only (no explicit deletes), 
> so no tombstones.
> 4) Optionally add back an additional type of delete that would delete all 
> data earlier than a particular timestamp, resulting in immediate dropping of 
> obsoleted sstables.
> The result is that for in-order delivered data, Every cell will be laid out 
> optimally on disk on the first pass, and over the course of 1000 days and 5TB 
> of data, there will "only" be 1000 5GB sstables, so the number of filehandles 
> will be reasonable.
> For exceptions (out-of-order delivery), most cases will be caught by the 
> extended (24 hour+) memtable flush times and merged correctly automatically. 
> For those that were slightly askew at flush time, or were delivered so far 
> out of order that they go in the wrong sstable, there is relatively low 
> overhead to reading from two sstables for a time slice, instead of one, and 
> that overhead would be incurred relatively rarely unless out-of-order 
> delivery was the common case, in which case, this strategy should not be used.
> Another possible optimization to address out-of-order would be to maintain 
> more than one time-centric memtables in memory at a time (e.g. two 12 hour 
> ones), and then you always insert into whichever one of the two "owns" the 
> appropriate range of time. By delaying flushing the ahead one until we are 
> ready to roll writes over to a third one, we are able to avoid any 
> fragmentation as long as all deliveries come in no more than 12 hours late 
> (in this example, presumably tunable).
> Anything that triggers compactions will have to be looked at, since there 
> won't be any. The one concern I have is the ramificaiton of repair. 
> Initially, at least, I think it would be acceptable to just write one sstable 
> per repair and not bother trying to merge it with other sstables.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to