Repository: kafka Updated Branches: refs/heads/0.11.0 07bd39467 -> f0cd6af6d
KAFKA-5584; Fix integer overflow in Log.size It may lead to wrong metrics and it may break size-based retention. Author: Gregor Uhlenheuer <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3521 from kongo2002/KAFKA-5584 (cherry picked from commit 2d2e9adb5d8d3805f082208ae9dd241f87566b27) Signed-off-by: Ismael Juma <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f0cd6af6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f0cd6af6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f0cd6af6 Branch: refs/heads/0.11.0 Commit: f0cd6af6d94ad61e8b3fc257b89d574db2029c86 Parents: 07bd394 Author: Gregor Uhlenheuer <[email protected]> Authored: Wed Jul 12 16:56:09 2017 -0700 Committer: Ismael Juma <[email protected]> Committed: Wed Jul 12 17:38:43 2017 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 11 ++++++++++- core/src/test/scala/unit/kafka/log/LogTest.scala | 13 +++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f0cd6af6/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 176a268..824d302 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1144,7 +1144,7 @@ class Log(@volatile var dir: File, /** * The size of the log in bytes */ - def size: Long = logSegments.map(_.size).sum + def size: Long = Log.sizeInBytes(logSegments) /** * The offset metadata of the next message that will be appended to the log @@ -1648,6 +1648,15 @@ object Log { filename.substring(0, filename.indexOf('.')).toLong /** + * Calculate a log's size (in bytes) based on its log segments + * + * @param segments The log segments to calculate the size of + * @return Sum of the log segments' sizes (in bytes) + */ + def sizeInBytes(segments: Iterable[LogSegment]): Long = + segments.map(_.size.toLong).sum + + /** * Parse the topic and partition out of the directory name of a log */ def parseTopicPartitionName(dir: File): TopicPartition = { http://git-wip-us.apache.org/repos/asf/kafka/blob/f0cd6af6/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 65a4eeb..008cd27 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -199,6 +199,19 @@ class LogTest { } @Test + def testSizeForLargeLogs(): Unit = { + val largeSize = Int.MaxValue.toLong * 2 + val logSegment = EasyMock.createMock(classOf[LogSegment]) + + EasyMock.expect(logSegment.size).andReturn(Int.MaxValue).anyTimes + EasyMock.replay(logSegment) + + assertEquals(Int.MaxValue, Log.sizeInBytes(Seq(logSegment))) + assertEquals(largeSize, Log.sizeInBytes(Seq(logSegment, logSegment))) + assertTrue(Log.sizeInBytes(Seq(logSegment, logSegment)) > Int.MaxValue) + } + + @Test def testPidMapOffsetUpdatedForNonIdempotentData() { val log = createLog(2048) val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)))
