[
https://issues.apache.org/jira/browse/KAFKA-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643791#comment-16643791
]
ASF GitHub Bot commented on KAFKA-7366:
---
junrao closed pull request #5728: KAFKA-7366: Make topic configs segment.bytes
and segment.ms to take effect immediately
URL: https://github.com/apache/kafka/pull/5728
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 094473a8e26..bc328d77efc 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -145,6 +145,26 @@ case class CompletedTxn(producerId: Long, firstOffset:
Long, lastOffset: Long, i
}
}
+/**
+ * A class used to hold params required to decide to rotate a log segment or
not.
+ */
+case class RollParams(maxSegmentMs: Long,
+ maxSegmentBytes: Int,
+ maxTimestampInMessages: Long,
+ maxOffsetInMessages: Long,
+ messagesSize: Int,
+ now: Long)
+
+object RollParams {
+ def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int,
now: Long): RollParams = {
+ new RollParams(config.segmentMs,
+ config.segmentSize,
+ appendInfo.maxTimestamp,
+ appendInfo.lastOffset,
+ messagesSize, now)
+ }
+}
+
/**
* An append-only log for storing messages.
*
@@ -1493,7 +1513,7 @@ class Log(@volatile var dir: File,
val maxTimestampInMessages = appendInfo.maxTimestamp
val maxOffsetInMessages = appendInfo.lastOffset
-if (segment.shouldRoll(messagesSize, maxTimestampInMessages,
maxOffsetInMessages, now)) {
+if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now)))
{
debug(s"Rolling new log segment (log_size =
${segment.size}/${config.segmentSize}}, " +
s"offset_index_size =
${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " +
s"time_index_size =
${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala
b/core/src/main/scala/kafka/log/LogSegment.scala
index 80763a8d797..d910a29100c 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -45,8 +45,10 @@ import scala.math._
* @param log The file records containing log entries
* @param offsetIndex The offset index
* @param timeIndex The timestamp index
+ * @param txnIndex The transaction index
* @param baseOffset A lower bound on the offsets in this segment
* @param indexIntervalBytes The approximate number of bytes between entries
in the index
+ * @param rollJitterMs The maximum random jitter subtracted from the scheduled
segment roll time
* @param time The time instance
*/
@nonthreadsafe
@@ -57,15 +59,13 @@ class LogSegment private[log] (val log: FileRecords,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
- val maxSegmentMs: Long,
- val maxSegmentBytes: Int,
val time: Time) extends Logging {
- def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long,
maxOffsetInMessages: Long, now: Long): Boolean = {
-val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) >
maxSegmentMs - rollJitterMs
-size > maxSegmentBytes - messagesSize ||
+ def shouldRoll(rollParams: RollParams): Boolean = {
+val reachedRollMs = timeWaitedForRoll(rollParams.now,
rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
+size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
(size > 0 && reachedRollMs) ||
- offsetIndex.isFull || timeIndex.isFull ||
!canConvertToRelativeOffset(maxOffsetInMessages)
+ offsetIndex.isFull || timeIndex.isFull ||
!canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
}
def resizeIndexes(size: Int): Unit = {
@@ -637,8 +637,6 @@ object LogSegment {
baseOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
- maxSegmentMs = config.segmentMs,
- maxSegmentBytes = config.segmentSize,
time)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 40b687443a0..353e5537588 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -38,9 +38,8 @@ class L