[ https://issues.apache.org/jira/browse/KAFKA-521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jay Kreps updated KAFKA-521: ---------------------------- Attachment: KAFKA-521-v1.patch Changes FileMessageSet: - Improve docs - rename constructor argument initChannelPositionToEnd to isSlice (i.e. describe what it means not what it does internally) - Use isSlice to avoid calling fileChannel.size (which stats the file) for slices. This was happening on every read, which is a minor performance bug. - Fix minor bug in FileChannel.iterator. The iterator was not working correctly in the case of a sliced message set, it respected the start argument but not the limit so it would always yield a complete log suffix. This is not too important because the iterator is only used for testing at this time. Log: - Remove SegmentList; replace with ConcurrentSkipListMap, a synchronized, sorted map of offset->segment. The goal of this is just reducing custom tricky code. This will mean no more possibility of duplicate entries in the segment list, which was a common problem. Likely this is not quite as efficient, but should be good enough. - Remove Log.findRange (since we removed SegmentList) - Move Log static/object methods to bottom of file for clarity - Support rebuilding the index file if it is missing - Move recoverSegment into LogSegment as LogSegment.recover since it is segment specific. - Move the per-topic message count metric out of Log and into KafkaApis since Log is not aware of topics - Change Log.append to return the full set of info about the append (first offset, last offset, number of messages, and compression codec) as a case class LogAppendInfo (previously MessageSetInfo). We were already calculating this as part of the append, this just exposes it. This is needed to move the per-topic monitoring out so we can still get the message count without computing it a second time. - Cleanup the control flow in append--now it is very imperative but, I think, a lot more readable - Log.read now supports reads that fall on the border between two segments when we have sparse offsets (i.e. if we have garbage collected messages at the end of a log segment and hence the read beings in the one segment but finds nothing and needs to then go to the next). This situation can't yet arrise since there is no cleaning, but that will happen soon. - Combine markDeletedWhile() and deleteSegments() into a single method deleteOldSegments that both removes from the skip list and deletes the files since we always did both together. - Remove Log.rollToOffset since the only offset we ever rolled to was logEndOffset, now we just have Log.roll() - Add helper methods: activeSegment() and logSegments() to get either the last segment or all segments sorted by offset. - Move getOffsetsBefore out of Log and into KafkaApis since this logic is very specific to the API and not at all general Log functionality. Instead KafkaApis uses Log.logSegments() to just directly process the segments. This is much cleaner and more generic (e.g. could support a more generalized version of that api without change). - Refactor truncateTo to work with the new segment map - Rename truncateAndStartWithNewOffset(offset) to truncateFullyAndStartAt(offset) and refactor to work with new segment map - Remove topicName since Log is not knowledgable about topics LogManager: - Remove brokerId from LogManager since LogManager doesn't know about brokers - Remove LogManager.getOffsets and move it into KafkaApis--see description above SegmentList, Range, SegmentListTest: deleted LogSegment: - Move recover() method out of Log and into LogSegment - No longer implement Range since we don't have to work with SegmentList - Rename start to baseOffset for clarity - Change firstAppendTime to created. No longer try to initialize this on the first append, instead just initialize it when the segment is created or truncated. Neither of these is quite correct (since we don't have a persistent created time), but this is not less correct and is less hacky. - Remove deleted flag since we weren't using it - Mark the individual methods in this class as either threadsafe or nonthreadsafe - Change LogSegment.read() now returns null if the start position is beyond the last message in this segment. This is necessary to allow us to detect this case and skip onto the next segment in the Log. This handles the gap case discussed above under Log. - Move deleteSegment() method out of Log and into LogSegment MessageSet - Change toString method. Previously it would create a string for the whole message set. This is reasonable for ByteBufferMessageSet but not for FileMessageSet. Instead limit this to the first 100 messages to avoid potential OOM due to some logging. ProducerConfig - Change the producer.num.retries to default to 1--we should not default to something that can cause duplicates KafkaApis - Move getOffsets methods out of Log into KafkaApis FileMessageSet: - Document all test cases LogManagerTest - Document all test cases LogOffsetTest - Move out of kafka.log and into kafka.server - Delete the section that covers the getOffsets api since that is covered already in a getOffset-specific test LogSegmentTest - Document all test cases LogTest - Add test case to cover non-sequential offsets better - Add a test case to cover the case where a read falls off the end of the segment (discussed above) - Add a test case to cover the case where the index has been deleted - Misc improvements > Refactor Log subsystem > ---------------------- > > Key: KAFKA-521 > URL: https://issues.apache.org/jira/browse/KAFKA-521 > Project: Kafka > Issue Type: Improvement > Reporter: Jay Kreps > Attachments: KAFKA-521-v1.patch > > > There are a number of items it would be nice to cleanup in the log subsystem: > 1. Misc. funky apis in Log and LogManager > 2. Much of the functionality in Log should move into LogSegment along with > corresponding tests > 3. We should remove SegmentList and instead use a ConcurrentSkipListMap > The general idea of the refactoring fall into two categories. First, improve > and thoroughly document the public APIs. Second, have a clear delineation of > responsibility between the various layers: > 1. LogManager is responsible for the creation and deletion of logs as well as > the retention of data in log segments. LogManager is the only layer aware of > partitions and topics. LogManager consists of a bunch of individual Log > instances and interacts with them only through their public API (mostly true > today). > 2. Log represents a totally ordered log. Log is responsible for reading, > appending, and truncating the log. A log consists of a bunch of LogSegments. > Currently much of the functionality in Log should move into LogSegment with > Log interacting only through the Log interface. Currently we reach around > this a lot to call into FileMessageSet and OffsetIndex. > 3. A LogSegment consists of an OffsetIndex and a FileMessageSet. It supports > largely the same APIs as Log, but now localized to a single segment. > This cleanup will simplify testing and debugging because it will make the > responsibilities and guarantees at each layer more clear. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira