[ 
https://issues.apache.org/jira/browse/KAFKA-631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13504192#comment-13504192
 ] 

Jay Kreps commented on KAFKA-631:
---------------------------------

Here is a specific proposal:

We will retain the existing settings that retain segments based on bytes and 
time, with data prior to these limits left unmolested. We will introduce a new 
setting for each topic "cleanup.policy"={delete, dedupe}. cleanup.policy=delete 
will correspond to the current behavior. cleanup.policy=dedupe will correspond 
to the new behavior described in this JIRA. As now, data that falls inside the 
retention window will not be touched, but data that is outside that window will 
be deduplicated rather than deleted. It is intended that this be a per-topic 
setting specified at topic creation time. As a short-cut for the purpose of 
this ticket I will just add a configuration map setting the policy in the way 
we have for other topic-level settings, these can all be refactored into 
something set in the create/alter topic command as a follow-up item.

Topics getting dedupe will be processed by a pool of background "cleaner" 
threads. These threads will periodically recopy old segment files removing 
obsolete messages and swapping in these new deduplicated files in place of the 
old segments. These sparse files should already be well-supported by the 
logical and sparse offset work in 0.8.

Here are the specific changes intended:
- Add a few new configs: 
   - topic.cleanup.policy={delete,dedupe} // A map of cleanup policies, 
defaults to delete
   - cleaner.thread.pool.size=# // The number of background threads to use for 
cleaning 
   - cleaner.buffer.size.bytes=# // The maximum amount of heap memory per 
cleaner thread that can be used for log deduplication
   - cleaner.max.{read,write}.throughput=# // The maximum bytes per second the 
cleaner can read or write
- Add a new method Log.replaceSegments() that replaces one or more old segments 
with a new segment while holding the log lock
- Implement a background cleaner thread that does the recopying. This thread 
will be owned and maintained by LogManager
- Add a new file per data directory called cleaner-metadata that maintains the 
cleaned section of the logs in that directory that have dedupe enabled. This 
allows the cleaner to restart cleaning from the same point upon restart.

The cleaning algorithm for a single log will work as follows:
1. Scan the head of the log (i.e. all messages since the last cleaning) and 
create a Map of key => offset for messages in the head of the log. If the 
cleaner buffer is too small to scan the full head of the log then just scan 
whatever fits going from oldest to newest.
2. Sequentially clean segments from oldest to newest.
3. To clean a segment, first create a new empty copy of the segment file with a 
temp name. Check each message in the original segment. If it is contained in 
the map with a higher offset, ignore it; otherwise recopy it to the new temp 
segment. When the segment is complete swap in the new file and delete the old. 

The threads will iterate over the logs and clean them periodically (not sure 
the right frequency yet). 

Some Nuances:
1. The above tends to lead to smaller and smaller segment files in the tail of 
the log as records are overwritten. To avoid this we will combine files; that 
is, we will always collect the largest set of files that together are smaller 
than the max segment size into a single segment. Obviously this will be based 
on the starting sizes, so the resulting segment will likely still be smaller 
than the resulting segment.
2. The recopying procedure depends on the property that logs are immutable. 
However our logs are only mostly immutable. It is possible to truncate a log to 
any segment. It is important that the cleaner respect this and not have a race 
condition with potential truncate operations. But likewise we can't lock for 
the duration of the cleaning as it may be quite slow. To work around this I 
will add a generation counter to the log. Each truncate operation will 
increment this counter. The cleaner will record the generation before it begins 
cleaning and the swap operation that swaps in the new, cleaned segment will 
only occur if the generations match (i.e. if no truncates happened in that 
segment during cleaning). This will potentially result in some wasted cleaner 
work when truncatations collide with cleanings, but since truncates are rare 
and truncates deep enough into the log to interact with cleaning very rare this 
should almost never happen.
                
> Implement log compaction
> ------------------------
>
>                 Key: KAFKA-631
>                 URL: https://issues.apache.org/jira/browse/KAFKA-631
>             Project: Kafka
>          Issue Type: New Feature
>          Components: core
>    Affects Versions: 0.8.1
>            Reporter: Jay Kreps
>
> Currently Kafka has only one way to bound the space of the log, namely by 
> deleting old segments. The policy that controls which segments are deleted 
> can be configured based either on the number of bytes to retain or the age of 
> the messages. This makes sense for event or log data which has no notion of 
> primary key. However lots of data has a primary key and consists of updates 
> by primary key. For this data it would be nice to be able to ensure that the 
> log contained at least the last version of every key.
> As an example, say that the Kafka topic contains a sequence of User Account 
> messages, each capturing the current state of a given user account. Rather 
> than simply discarding old segments, since the set of user accounts is 
> finite, it might make more sense to delete individual records that have been 
> made obsolete by a more recent update for the same key. This would ensure 
> that the topic contained at least the current state of each record.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to