Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph closed pull request #15634: KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO URL: https://github.com/apache/kafka/pull/15634 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
showuon commented on PR #15634: URL: https://github.com/apache/kafka/pull/15634#issuecomment-2133081717 @kamalcph , could we close this PR since https://github.com/apache/kafka/pull/15825 is merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
junrao commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1586840549 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > I'm not clear on this: > > 1. Segments that are eligible for upload to remote storage only when the lastStableOffset moves beyond the segment-to-be-uploaded-end-offset. > 2. When all the replicas loses local data (offline partition), then we consider the data in remote storage also lost. Currently, for this case, we don't have provision to serve the remote data. > 3. When firstUnstableOffsetMetadata is empty, we return highWatermark. With this patch, the highWatermark lower boundary is set to localLogStartOffset so there won't be an issue. > That's true. It's just that that is yet another offset that we need to bound. I am also not sure if there are other side effects of adjusting HWM and LSO. Left some comments on https://github.com/apache/kafka/pull/15825. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1582149811 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: Opened #15825 a draft PR with the suggested approach. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1582144306 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > For example, if we lose the local data in all replicas, the lastStableOffset could still be in the middle of a tiered segment and moving it to localLogStartOffset immediately will be incorrect. I'm not clear on this: 1. Segments that are eligible for upload to remote storage only when the `lastStableOffset` moves beyond the segment-to-be-uploaded-end-offset. 2. When all the replicas loses local data (offline partition), then we consider the data in remote storage also lost. Currently, for this case, we don't have provision to serve the remote data. 3. When `firstUnstableOffsetMetadata` is empty, we return `highWatermark`. With this patch, the `highWatermark` lower boundary is set to `localLogStartOffset` so there won't be an issue. > Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) is only used in DelayedFetch for estimating the amount of available bytes. The [LogOffsetMetadata#onOlderSegment](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java?L54) method is used in the [hot-path](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L324) of incrementing the high-watermark and expects the full metadata, otherwise it throws an error. Is it ok to remove the throwable from LogOffsetMetadata#onOlderSegment method and return `false` when `messageOffsetOnly` available? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1582144306 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > For example, if we lose the local data in all replicas, the lastStableOffset could still be in the middle of a tiered segment and moving it to localLogStartOffset immediately will be incorrect. I'm not clear on this: 1. Segments that are eligible for upload to remote storage only when the `lastStableOffset` moves beyond the segment-to-be-uploaded-end-offset. 2. When all the replicas loses local data (offline partition), then we consider the data in remote storage also lost. Currently, for this case, we don't have provision to serve the remote data. 3. When `firstUnstableOffsetMetadata` is empty, we return `highWatermark`. With this patch, the `highWatermark` lower boundary is set to `localLogStartOffset` so there won't be an issue. > Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) is only used in DelayedFetch for estimating the amount of available bytes. The [LogOffsetMetadata#onOlderSegment](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java?L54) method is used in the [hot-path](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L324) of incrementing the high-watermark and expects the full metadata, otherwise it throws an error. Is it ok to remove the throwable from LogOffsetMetadata#onOlderSegment method and return `false` by default? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1582144306 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > For example, if we lose the local data in all replicas, the lastStableOffset could still be in the middle of a tiered segment and moving it to localLogStartOffset immediately will be incorrect. I'm not clear on this: 1. Segments that are eligible for upload to remote storage only when the `lastStableOffset` moves beyond the segment-to-be-uploaded-end-offset. 2. When all the replicas loses local data (offline partition), then we consider the data in remote storage also lost. Currently, for this case, we don't have provision to serve the remote data. 3. When `firstUnstableOffsetMetadata` is empty, we return `highWatermark`. With this patch, the `highWatermark` lower boundary is set to `localLogStartOffset` so there won't be an issue. > Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) is only used in DelayedFetch for estimating the amount of available bytes. The [LogOffsetMetadata#onOlderSegment](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java?L54) method is used in the [hot-path](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L324) of incrementing the high-watermark and expects the full metadata, otherwise it throws an error. Is it ok to remove the throwable from LogOffsetMetadata#onOlderSegment method and return `false` by default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1571126880 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: Thanks for suggesting the alternative approach. I'll check and comeback on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on PR #15634: URL: https://github.com/apache/kafka/pull/15634#issuecomment-2064647379 Thanks @chia7712 for the review! > log-start-offset-checkpoint is missing and remote storage is enabled. The logStartOffset will be set to zero, and it seems be a potential issue since the ListOffsetRequest could get incorrect result Most of the time when the follower joins the ISR, it updates the log-start-offset and high-watermark from the leader FETCH response. The issue can happen only when the follower gets elected as leader before updating it's state as mentioned in the summary/comments. When the `log-start-offset-checkpoint` file is missing: 1. For normal topic, the log-start-offset will be set to base-offset of the first log segment so there is no issue. Since the data is there, read won't fail. 2. For remote topic, the log-start-offset will be stale for sometime until the RemoteLogManager [updates](https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L671) it, so the issue is intermittent and self-recovers. > replication-offset-checkpoint is missing and remote storage is enabled. This is what your described. The HWM is pointed to middle of tiered storage and so it causes error when fetching records from local segments. This is not an issue for normal topic. But for cluster enabled with remote-storage, if the issue happens even on 1 partition, then it starts to affect *subset* of topics. Controller batches the partitions in the LeaderAndIsr request. If the broker fails to process the LISR for one partition, then the remaining partition in that batch won't be processed. The producers producing to those topics will start receiving NOT_LEADER_FOR_PARTITION error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
chia7712 commented on PR #15634: URL: https://github.com/apache/kafka/pull/15634#issuecomment-2063727744 Sorry that the story I mentioned above seems be another issue. Let me have the summary about my thought. 1. `log-start-offset-checkpoint` is missing and remote storage is enabled. The `logStartOffset` will be set to zero, and it seems be a potential issue since the `ListOffsetRequest` could get incorrect result 2. `replication-offset-checkpoint` is missing and remote storage is enabled. This is what your described. The HWM is pointed to middle of tiered storage and so it causes error when fetching records from local segments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
chia7712 commented on PR #15634: URL: https://github.com/apache/kafka/pull/15634#issuecomment-2063435567 > HWM is set to to localLogStartOffset in [UnifiedLog#updateLocalLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L162), then we load the HWM from the checkpoint file in [Partition#createLog](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L495). If the HWM checkpoint file is missing / does not contain the entry for partition, then the default value of 0 is taken. If 0 < LogStartOffset (LSO), then LSO is assumed as HWM . Thus, the non-monotonic update of highwatermark from LLSO to LSO can happen. Pardon me. I'm a bit confused about this. Please feel free to correct me to help me catch up :smile: ### case 0: the checkpoint file is missing and the remote storage is **disabled** The LSO is initialized to LLSO https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/LogLoader.scala#L180 so I can't understand why the non-monotonic update happens? After all, LLSO and LSO are the same in this scenario. ### case 1: the checkpoint file is missing and the remote storage is **enabled** The LSO is initialzied to `logStartOffsetCheckpoint` which is 0 since there are no checkpoint files. https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/LogLoader.scala#L178 And then HWM will be update to LLSO which is larger than zero. https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/UnifiedLog.scala#L172 And this could be a problem when [Partition#createLog](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L495) get called since the HWM is changed from LLSO (non-zero) to LSO (zero). Also, the incorrect HWM causes error in `convertToOffsetMetadataOrThrow`. If I understand correctly, it seems the root cause is that "when the checkpoint files are not working, we will initialize a `UnifiedLog` with incorrect LSO". and so could we fix that by re-build `logStartOffsets` according remote storage when checkpoint is not working (https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/LogManager.scala#L459)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
junrao commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1569562746 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: @kamalcph : Thanks for the explanation. I understand the problem now. As for the fix, it seems that it could work for HWM. However, I am not sure that we could always do the same thing of LastStableOffset. For example, if we lose the local data in all replicas, the lastStableOffset could still be in the middle of a tiered segment and moving it to localLogStartOffset immediately will be incorrect. Here is another potential approach. Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) is only used in DelayedFetch for estimating the amount of available bytes. If occasionally OffsetMetadata is not available, we don't have to force an exception in convertToOffsetMetadataOrThrow(). Instead, we can leave the OffsetMetadata as empty and just use a conservative 1 byte for estimating the amount of available bytes. This approach will apply to both HWM and LSO. The inaccurate byte estimate will be ok as long as it's infrequent. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1569356239 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1223,6 +1223,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, s"but we only have log segments starting from offset: $logStartOffset.") } + private def checkLocalLogStartOffset(offset: Long): Unit = { Review Comment: Agree on this. The `checkLocalLogStartOffset` is used only in the `convertToOffsetMetadataOrThrow` method which reads from local-disk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? yes, we want to address this case too. And, the issue can also happen during clean preferred-leader-election: ``` Call stack: The replica (1002) has full data but HW is invalid, then the fetch-offset will be equal to LeaderLog(1001).highWatermark Leader (1001): KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLocalLog Partition.fetchRecords Partition.updateFollowerFetchState Partition.maybeExpandIsr Partition.submitAlterPartition ... ... ... # If there is not enough data to respond and there is no remote data, we will let the fetch request wait for new data. # parks the request in the DelayedFetchPurgatory Another thread, runs Preferred-Leader-Election in controller (1003), since the replica 1002 joined the ISR list, it can be elected as the preferred leader. The controller sends LeaderAndIsr requests to all the brokers. KafkaController.processReplicaLeaderElection KafkaController.onReplicaElection PartitionStateMachine.handleStateChanges PartitionStateMachine.doHandleStateChanges PartitionStateMachine.electLeaderForPartitions ControllerChannelManager.sendRequestsToBrokers Replica 1002 got elected as Leader and have invalid highWatermark since it didn't process the fetch-response from the previous leader 1001, throws OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that in LeaderAndIsr request even if one partition fails, then the remaining partitions in that request won't be processed. KafkaApis.handleLeaderAndIsrRequest ReplicaManager.becomeLeaderOrFollower ReplicaManager.makeLeaders Partition.makeLeader Partition.maybeIncrementLeaderHW UnifiedLog.maybeIncrementHighWatermark (LeaderLog) UnifiedLog.fetchHighWatermarkMetadata The controller assumes that the current-leader for the tp0 is 1002, but the broker 1002 couldn't process the LISR. The controller retries the LISR until the broker 1002 becomes leader for tp0. During this time, the producers won't be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION error-code to the producer. During this time, if a follower sends the FETCH request to read from the current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned by the leader: KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLog Partition.fetchRecords # readFromLocalLog Partition.updateFollowerFetchState Partition.maybeIncrementLeaderHW LeaderLog.maybeIncrementHighWatermark UnifiedLog.fetchHighWatermarkMetadata UnifiedLog.convertToOffsetMetadataOrThrow LocalLog.convertToOffsetMetadataOrThrow LocalLog.read # OffsetOutOfRangeException exception ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1569328393 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > For the makeLeaders path, it will call UnifiedLog.convertToOffsetMetadataOrThrow. Within it, checkLogStartOffset(offset) shouldn't throw OFFSET_OUT_OF_RANGE since we are comparing the offset with logStartOffset. Do you know which part throws OFFSET_OUT_OF_RANGE error? The next line `localLog.convertToOffsetMetadataOrThrow` in [convertToOffsetMetadataOrThrow](https://sourcegraph.com/github.com/apache/kafka@a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L1429) method reads the segment from disk, there it throws the error. The call [segments.floorSegment(startOffset)](https://sourcegraph.com/github.com/apache/kafka@a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/-/blob/core/src/main/scala/kafka/log/LocalLog.scala?L366) in LocalLog fails to find the segment with log-start-offset, then OffsetOutOfRangeException is thrown. > For the follower fetch path, it's bounded by LogEndOffset. So it shouldn't need to call UnifiedLog.fetchHighWatermarkMetadata, right? The regular consumer will call UnifiedLog.fetchHighWatermarkMetadata. yes, you're right. I attached the wrong call stack for handling the follower request. Please find the updated call stack below: Leader with invalid high-watermark handles the FETCH requests from follower and throws OFFSET_OUT_OF_RANGE error: ``` KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLog Partition.fetchRecords # readFromLocalLog Partition.updateFollowerFetchState Partition.maybeIncrementLeaderHW LeaderLog.maybeIncrementHighWatermark UnifiedLog.fetchHighWatermarkMetadata UnifiedLog.convertToOffsetMetadataOrThrow LocalLog.convertToOffsetMetadataOrThrow LocalLog.read # OffsetOutOfRangeException exception ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? yes, we want to address this case too. And, the issue can also happen during clean preferred-leader-election: ``` Call stack: The replica (1002) has full data but HW is invalid, then the fetch-offset will be equal to LeaderLog(1001).highWatermark Leader (1001): KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLocalLog Partition.fetchRecords Partition.updateFollowerFetchState Partition.maybeExpandIsr Partition.submitAlterPartition ... ... ... # If there is not enough data to respond and there is no remote data, we will let the fetch request wait for new data. # parks the request in the DelayedFetchPurgatory Another thread, runs Preferred-Leader-Election in controller (1003), since the replica 1002 joined the ISR list, it can be elected as the preferred leader. The controller sends LeaderAndIsr requests to all the brokers. KafkaController.processReplicaLeaderElection KafkaController.onReplicaElection PartitionStateMachine.handleStateChanges PartitionStateMachine.doHandleStateChanges PartitionStateMachine.electLeaderForPartitions ControllerChannelManager.sendRequestsToBrokers Replica 1002 got elected as Leader and have invalid highWatermark since it didn't process the fetch-response from the previous leader 1001, throws OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that in LeaderAndIsr request even if one partition fails, then the remaining partitions in that request won't be processed. KafkaApis.handleLeaderAndIsrRequest ReplicaManager.becomeLeaderOrFollower ReplicaManager.makeLeaders Partition.makeLeader Partition.maybeIncrementLeaderHW UnifiedLog.maybeIncrementHighWatermark (LeaderLog) UnifiedLog.fetchHighWatermarkMetadata The controller assumes that the current-leader for the tp0 is 1002, but the broker 1002 couldn't process the LISR. The controller retries the LISR until the broker 1002 becomes leader for tp0. During this time, the producers won't be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION error-code to the producer. During this time, if a follower sends the FETCH request to read from the current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned by the leader: KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLog Partition.fetchRecords Partition.readRecords UnifiedLog.read UnifiedLog.fetchHighWatermarkMetadata UnifiedLog.convertToOffsetMetadataOrThrow LocalLog.convertToOffsetMetadataOrThrow LocalLog.read ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
junrao commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1567796843 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: Thanks for the detailed explanation. For the `makeLeaders` path, it will call `UnifiedLog.convertToOffsetMetadataOrThrow`. Within it, `checkLogStartOffset(offset)` shouldn't throw OFFSET_OUT_OF_RANGE since we are comparing the offset with logStartOffset. Do you know which part throws OFFSET_OUT_OF_RANGE error? For the follower fetch path, it's bounded by `LogEndOffset`. So it shouldn't need to call `UnifiedLog.fetchHighWatermarkMetadata`, right? The regular consumer will call `UnifiedLog.fetchHighWatermarkMetadata`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
chia7712 commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1567414802 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1223,6 +1223,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, s"but we only have log segments starting from offset: $logStartOffset.") } + private def checkLocalLogStartOffset(offset: Long): Unit = { Review Comment: It seems reading records between [logStartOffset, localLogStartOffset] is dangerous since the segment won't be in local-disk. That is a bit chaos to me as `UnifiedLog` presents a unified view of local and tiered log segment (https://github.com/apache/kafka/blob/fccd7fec666d6570758e0b7891771099240ceee8/core/src/main/scala/kafka/log/UnifiedLog.scala#L59). The check looks like a limit that we can't "view" data from tiered log segment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? yes, we want to address this case too. And, the issue can also happen during clean preferred-leader-election: ``` Call stack: The replica (1002) has full data but HW is invalid, then the fetch-offset will be equal to LeaderLog(1001).highWatermark Leader (1001): KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLocalLog Partition.fetchRecords Partition.updateFollowerFetchState Partition.maybeExpandIsr Partition.submitAlterPartition ... ... ... # If there is not enough data to respond and there is no remote data, we will let the fetch request wait for new data. # parks the request in the DelayedFetchPurgatory Another thread, runs Preferred-Leader-Election in controller (1003), since the replica 1002 joined the ISR list, it can be elected as the preferred leader. The controller sends LeaderAndIsr requests to all the brokers. KafkaController.processReplicaLeaderElection KafkaController.onReplicaElection PartitionStateMachine.handleStateChanges PartitionStateMachine.doHandleStateChanges PartitionStateMachine.electLeaderForPartitions ControllerChannelManager.sendRequestsToBrokers Replica 1002 got elected as Leader and have invalid highWatermark since it didn't process the fetch-response from the previous leader 1001, throws OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that in LeaderAndIsr request even if one partition fails, then the remaining partitions in that request won't be processed. KafkaApis.handleLeaderAndIsrRequest ReplicaManager.becomeLeaderOrFollower ReplicaManager.makeLeaders Partition.makeLeader Partition.maybeIncrementLeaderHW UnifiedLog.maybeIncrementHighWatermark (LeaderLog) UnifiedLog.fetchHighWatermarkMetadata The controller assumes that the current-leader for the tp0 is 1002, but the broker 1002 couldn't process the LISR. The controller retries the LISR until the broker 1002 becomes leader for tp0. During this time, the producers won't be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION error-code to the producer. During this time, if a follower sends the FETCH request to read from the current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned: KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLog Partition.fetchRecords Partition.readRecords UnifiedLog.read UnifiedLog.fetchHighWatermarkMetadata UnifiedLog.convertToOffsetMetadataOrThrow LocalLog.convertToOffsetMetadataOrThrow LocalLog.read ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? yes, we want to address this case too. And, the issue can also happen during clean preferred-leader-election: ``` Call stack: The replica (1002) has full data but HW is invalid, then the fetch-offset will be equal to LeaderLog(1001).highWatermark Leader (1001): KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLocalLog Partition.fetchRecords Partition.updateFollowerFetchState Partition.maybeExpandIsr Partition.submitAlterPartition ... ... ... # If there is not enough data to respond and there is no remote data, we will let the fetch request wait for new data. # parks the request in the DelayedFetchPurgatory Another thread, runs Preferred-Leader-Election in controller (1003), since the replica 1002 joined the ISR list, it can be elected as the preferred leader. The controller sends LeaderAndIsr requests to all the brokers. KafkaController.processReplicaLeaderElection KafkaController.onReplicaElection PartitionStateMachine.handleStateChanges PartitionStateMachine.doHandleStateChanges PartitionStateMachine.electLeaderForPartitions ControllerChannelManager.sendRequestsToBrokers Replica 1002 got elected as Leader and have invalid highWatermark since it didn't process the fetch-response from the previous leader 1001, throws OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that in LeaderAndIsr request even if one partition fails, then the remaining partitions in that request won't be processed. KafkaApis.handleLeaderAndIsrRequest ReplicaManager.becomeLeaderOrFollower ReplicaManager.makeLeaders Partition.makeLeader Partition.maybeIncrementLeaderHW UnifiedLog.maybeIncrementHighWatermark (LeaderLog) UnifiedLog.fetchHighWatermarkMetadata The controller assumes that the current-leader for the tp0 is 1002, but the broker 1002 couldn't process the LISR. The controller retries the LISR until the broker 1002 becomes leader for tp0. During this time, the producers won't be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION error-code to the producer. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? yes, we want to address this case too. And, the issue can also happen during clean preferred-leader-election: ``` Call stack: The replica (1002) has full data but HW is invalid, then the fetch-offset will be equal to LeaderLog(1001).highWatermark Leader (1001): KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLocalLog Partition.fetchRecords Partition.updateFollowerFetchState Partition.maybeExpandIsr Partition.submitAlterPartition ... ... ... # If there is not enough data to respond and there is no remote data, we will let the fetch request wait for new data. # parks the request in the DelayedFetchPurgatory Another thread, runs Preferred-Leader-Election in controller (1003), since the replica 1002 joined the ISR list, it can be elected as the preferred leader. The controller sends LeaderAndIsr requests to all the brokers. KafkaController.processReplicaLeaderElection KafkaController.onReplicaElection PartitionStateMachine.handleStateChanges PartitionStateMachine.doHandleStateChanges PartitionStateMachine.electLeaderForPartitions ControllerChannelManager.sendRequestsToBrokers Replica 1002 got elected as Leader and have invalid highWatermark since it didn't processed the fetch-response from the previous leader 1001, throws OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that if the LeaderAndIsr request fails to process for one partition, then the remaining partitions in that request won't be processed. KafkaApis.handleLeaderAndIsrRequest ReplicaManager.becomeLeaderOrFollower ReplicaManager.makeLeaders Partition.makeLeader Partition.maybeIncrementLeaderHW UnifiedLog.maybeIncrementHighWatermark (LeaderLog) UnifiedLog.fetchHighWatermarkMetadata The controller assumes that the current-leader for the tp0 is 1002, but the broker 1002 couldn't process the LISR. The controller retries the LISR until the broker becomes leader for tp0. During this time, the producers won't be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION errorcode to the producer. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
junrao commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1562991673 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: Yes, if replication-offset-checkpoint is corrupted, HWM could temporarily be set to below local-log-start-offset. I am still trying to understand the impact of that. In the common case, the restarted broker can't become the leader or serve reads until it's caught up. At that time, the HWM will be up to date. In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? The jira also says: > If the high watermark is less than the local-log-start-offset, then the [UnifiedLog#fetchHighWatermarkMetadata](https://sourcegraph.com/github.com/apache/kafka@d4caa1c10ec81b9c87eaaf52b73c83d5579b68d3/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L358) method will throw the OFFSET_OUT_OF_RANGE error when it converts the offset to metadata. Once this error happens, the followers will receive out-of-range exceptions and the producers won't be able to produce messages since the leader cannot move the high watermark. However, the follower read is bounded by logEndOffset, not HWM? Where does the follower read need to convert HWM to metadata? ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -318,6 +318,80 @@ class UnifiedLogTest { assertHighWatermark(4L) } + @Test + def testHighWatermarkMaintenanceForRemoteTopic(): Unit = { +val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024, remoteLogStorageEnable = true) +val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) +val leaderEpoch = 0 + +def assertHighWatermark(offset: Long): Unit = { + assertEquals(offset, log.highWatermark) + assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark) +} + +// High watermark initialized to 0 +assertHighWatermark(0L) + +var offset = 0L +for(_ <- 0 until 50) { + val records = TestUtils.singletonRecords("test".getBytes()) + val info = log.appendAsLeader(records, leaderEpoch) + offset = info.lastOffset + if (offset != 0 && offset % 10 == 0) +log.roll() +} +assertEquals(5, log.logSegments.size) + +// High watermark not changed by append +assertHighWatermark(0L) + +// Update high watermark as leader +log.maybeIncrementHighWatermark(new LogOffsetMetadata(50L)) +assertHighWatermark(50L) +assertEquals(50L, log.logEndOffset) + +// Cannot update high watermark past the log end offset +log.updateHighWatermark(60L) +assertHighWatermark(50L) + +// simulate calls to upload 3 segments to remote storage and remove them from local-log. +log.updateHighestOffsetInRemoteStorage(30) +log.maybeIncrementLocalLogStartOffset(31L, LogStartOffsetIncrementReason.SegmentDeletion) +log.deleteOldSegments() +assertEquals(2, log.logSegments.size) +assertEquals(31L, log.localLogStartOffset()) +assertHighWatermark(50L) + +// simulate one remote-log segment deletion +val logStartOffset = 11L +log.maybeIncrementLogStartOffset(logStartOffset, LogStartOffsetIncrementReason.SegmentDeletion) +assertEquals(11, log.logStartOffset) + +// Updating the HW below the log-start-offset / local-log-start-offset is not allowed. HW should reset to local-log-start-offset. +log.updateHighWatermark(new LogOffsetMetadata(5L)) +assertHighWatermark(31L) +// Updating the HW between log-start-offset and local-log-start-offset is not allowed. HW should reset to local-log-start-offset. Review Comment: This is moving HW below local-log-start-offset, not log-start-offset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on PR #15634: URL: https://github.com/apache/kafka/pull/15634#issuecomment-2050934793 @junrao @showuon @divijvaidya Gentle bump to review the diff, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1557089094 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > when will we set HWM to be lower than _localLogStartOffset? This can happen when recovering the partition due to ungraceful shutdown and the replication-offset-checkpoint file is missing/corrupted. When the broker comes online, HWM is set to to localLogStartOffset in [UnifiedLog#updateLocalLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L162), then we load the HWM from the checkpoint file in [Partition#createLog](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L495). If the HWM checkpoint file is missing / does not contain the entry for partition, then the default value of 0 is taken. If 0 < LogStartOffset (LSO), then LSO is assumed as HWM . Thus, the non-monotonic update of highwatermark from LLSO to LSO can happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
junrao commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1556593959 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: Hmm, when will we set HWM to be lower than _localLogStartOffset? In `UnifiedLog.deletableSegments()`, we have the following code that bounds the retention based deletion by highWatermark. When updating highWatermark, the value typically increases. `val predicateResult = highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt) ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
satishd commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1546167963 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -136,16 +136,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = None + @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None Review Comment: nit: You can leave it at the earliest place for this field as it is not really needed for this change. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -136,16 +136,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = None + @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None + + @volatile private[kafka] var _localLogStartOffset: Long = logStartOffset + /* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are * not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark * equals the log end offset (which may never happen for a partition under consistent load). This is needed to * prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark. */ - @volatile private var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(logStartOffset) - - @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None - - @volatile private[kafka] var _localLogStartOffset: Long = logStartOffset + @volatile private var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(_localLogStartOffset) Review Comment: There won't be any effect with this change as `_localLogStartOffset` is initialized with `logStartOffset`. But it is good to keep `_localLogStartOffset` for consistency and relevance of this field. ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -318,6 +318,80 @@ class UnifiedLogTest { assertHighWatermark(4L) } + @Test + def testHighWatermarkMaintenanceForRemoteTopic(): Unit = { +val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024, remoteLogStorageEnable = true) +val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) +val leaderEpoch = 0 + +def assertHighWatermark(offset: Long): Unit = { + assertEquals(offset, log.highWatermark) + assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark) +} + +// High watermark initialized to 0 +assertHighWatermark(0L) + +var offset = 0L +for(_ <- 0 until 50) { + val records = TestUtils.singletonRecords("test".getBytes()) + val info = log.appendAsLeader(records, leaderEpoch) + offset = info.lastOffset + if (offset != 0 && offset % 10 == 0) +log.roll() +} +assertEquals(5, log.logSegments.size) + +// High watermark not changed by append +assertHighWatermark(0L) + +// Update high watermark as leader +log.maybeIncrementHighWatermark(new LogOffsetMetadata(50L)) +assertHighWatermark(50L) +assertEquals(50L, log.logEndOffset) + +// Cannot update high watermark past the log end offset +log.updateHighWatermark(60L) +assertHighWatermark(50L) + +// simulate calls to upload 3 segments to remote storage and remove them from local-log. +log.updateHighestOffsetInRemoteStorage(30) +log.maybeIncrementLocalLogStartOffset(31L, LogStartOffsetIncrementReason.SegmentDeletion) +log.deleteOldSegments() +assertEquals(2, log.logSegments.size) +assertEquals(31L, log.localLogStartOffset()) +assertHighWatermark(50L) + +// simulate one remote-log segment deletion +val logStartOffset = 11L +log.maybeIncrementLogStartOffset(logStartOffset, LogStartOffsetIncrementReason.SegmentDeletion) +assertEquals(11, log.logStartOffset) + +// Updating the HW below the log-start-offset / local-log-start-offset is not allowed. HW should reset to local-log-start-offset. +log.updateHighWatermark(new LogOffsetMetadata(5L)) +assertHighWatermark(31L) +// Updating the HW between log-start-offset and local-log-start-offset is not allowed. HW should reset to local-log-start-offset. +log.updateHighWatermark(new LogOffsetMetadata(25L)) +assertHighWatermark(31L) +// Updating the HW between local-log-start-offset and log-end-offset is allowed. +log.updateHighWatermark(new LogOffsetMetadata(32L)) +assertHighWatermark(32L) +assertEquals(11L, log.logStartOffset) +assertEquals(31L, log.localLogStartOffset()) + +// Truncating the logs to below the local-log-start-offset, should update the high watermark Review Comment: Good to see covering the truncation scenarios also. -- This is an automated message from the
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on PR #15634: URL: https://github.com/apache/kafka/pull/15634#issuecomment-2029474481 > just curious. Does it happens only if remote storage is enabled? According to the description: > > > The follower sends the first FETCH request to the leader, the leader checks whether the isFollowerInSync, then expands the ISR. Also, parks the request in DelayedFetchPurgatory. If the replica was elected as leader before the fetch-response gets processed, then the new-leader will have wrong high-watermark. > > It looks like the issue is existent even though we don't use remote storage. For normal topic, once the replica becomes leader. It is able to [resolve/convert](https://sourcegraph.com/github.com/apache/kafka@40e87ae35beb389d6419d32130174d7c68fa4d19/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala#L319) the highwatermark offset (log-start-offset) to metadata by reading the segment from disk and then it updates the high-watermark to either current-leader-log-end-offset (or) the lowest LEO of all the eligible-isr replicas. In case of remote topic, the replica will fail to [resolve](https://sourcegraph.com/github.com/apache/kafka@40e87ae35beb389d6419d32130174d7c68fa4d19/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala#L319) the highwatermark offset (log-start-offset) to metadata since the segment won't be in local-disk, and then fail continuously. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
chia7712 commented on PR #15634: URL: https://github.com/apache/kafka/pull/15634#issuecomment-2029310080 just curious. Does it happens only if remote storage is enabled? According to the description: > The follower sends the first FETCH request to the leader, the leader checks whether the isFollowerInSync, then expands the ISR. Also, parks the request in DelayedFetchPurgatory. If the replica was elected as leader before the fetch-response gets processed, then the new-leader will have wrong high-watermark. It looks like the issue is existent even though we don't use remote storage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph opened a new pull request, #15634: URL: https://github.com/apache/kafka/pull/15634 Bound high-watermark offset between local-log-start-offset and log-end-offset: The high watermark should not go below the local-log-start offset. If the high watermark is less than the local-log-start-offset, then the UnifiedLog#fetchHighWatermarkMetadata method will throw OFFSET_OUT_OF_RANGE error when it converts the offset to metadata. Once this error happens, the followers will receive out-of-range exceptions and the producers won't be able to produce messages since the leader cannot move the high watermark. This issue can happen when the partition undergoes recovery due to corruption in the checkpoint file and it gets elected as leader before it gets a chance to update the HW from the previous leader. The follower sends the FETCH request to the leader, the leader checks whether the isFollowerInSync, then expands the ISR. Also, parks the request in DelayedFetchPurgatory. If the replica was elected as leader before the fetch-response gets processed, then the new-leader will have wrong high-watermark. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org