showuon merged PR #15825:
URL: https://github.com/apache/kafka/pull/15825
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail:
junrao commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2130475760
@showuon : Any other comments from you?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the
kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2129976827
Test failures are unrelated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the
junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1612186225
##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -103,7 +103,7 @@ class DelayedFetch(
// We will not force complete the fetch request if a
junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1612025298
##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1611591188
##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java:
##
@@ -51,23 +50,24 @@ public LogOffsetMetadata(long messageOffset,
//
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1611570051
##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,23 @@ class DelayedFetch(
// Go directly to the check for Case G if the message
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1611559594
##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java:
##
@@ -51,23 +50,24 @@ public LogOffsetMetadata(long messageOffset,
//
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1611046740
##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -144,6 +144,35 @@ class LogSegmentTest {
checkEquals(ms2.records.iterator,
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769
##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769
##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610895466
##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,23 @@ class DelayedFetch(
// Go directly to the check for Case G if the message
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769
##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610572095
##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,23 @@ class DelayedFetch(
// Go directly to the check for Case G if the message
junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610452888
##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,24 @@ class DelayedFetch(
// Go directly to the check for Case G if the message
kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2125471280
@junrao
Thanks for the review! Addressed all your comments. PTAL.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165
##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize,
long
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165
##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize,
long
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165
##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize,
long
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610401875
##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,24 @@ class DelayedFetch(
// Go directly to the check for Case G if the message
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610399596
##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -143,6 +143,39 @@ class LogSegmentTest {
checkEquals(ms2.records.iterator,
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610399596
##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -143,6 +143,39 @@ class LogSegmentTest {
checkEquals(ms2.records.iterator,
junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610310799
##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -143,6 +143,39 @@ class LogSegmentTest {
checkEquals(ms2.records.iterator,
kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2123824613
@junrao @showuon
Thanks for the review! Addressed all the review comments. PTAL.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609227442
##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
throw new
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609227180
##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
throw new
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1609081346
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608986312
##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
throw new
junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608984545
##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
throw new
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608834195
##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
throw new
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608831211
##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
throw new
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608831211
##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
throw new
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608823231
##
storage/src/test/java/org/apache/kafka/tiered/storage/actions/EraseBrokerStorageAction.java:
##
@@ -19,24 +19,35 @@
import
junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1607381004
##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -92,7 +92,14 @@ class DelayedFetch(
// has just rolled, then the high watermark offset will
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608327384
##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,17 @@ class LocalLog(@volatile private var _dir: File,
throw new
kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2122625313
> @kamalcph , had another look, left some comments. Also, could we add an
integration test for this case like you described in the PR description?
`FetchFromFollowerIntegrationTest`
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608010033
##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -391,8 +398,10 @@ class UnifiedLogTest {
assertNonEmptyFetch(log, offset,
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608007659
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1608007659
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2121704898
Test failures are unrelated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1606925582
##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File,
throw new
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1606711171
##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File,
throw new
kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2120351527
> @kamalcph , do we have any update for this PR?
sorry for the delay. Addressed all the review comments. PTAL.
--
This is an automated message from the Apache Git Service.
To
showuon commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2119875404
@kamalcph , do we have any update for this PR?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to
junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1599161815
##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File,
throw new
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598838884
##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -391,8 +398,10 @@ class UnifiedLogTest {
assertNonEmptyFetch(log, offset,
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598848735
##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File,
throw new
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598848735
##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File,
throw new
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598841362
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598841362
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598838884
##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -391,8 +398,10 @@ class UnifiedLogTest {
assertNonEmptyFetch(log, offset,
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598832131
##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598832131
##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1597073119
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
jsancio commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2105087021
@kamalcph, looks like a bug to me. The predicate should be `if
(!hwm.messageOffsetOnly)` or the if/else blocks should be swapped. I suspect
that we haven't noticed this bug in the KRaft
kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2103912972
While going through the usages, it looks to me that the LogOffsetMetadata
conversion happens in the KafkaMetadataLog is not correct. Could someone please
double check?
```
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1596278649
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1596276757
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1596055964
##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -92,7 +92,10 @@ class DelayedFetch(
// has just rolled, then the high watermark offset will
kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2103078761
Test failures are unrelated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593525765
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593511224
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2099844734
> Thanks for the PR. One question: So when we temporarily set high-watermark
as ` LogOffsetMetadata(0)` for the leader, we're waiting for the high-watermark
gets updated after followers
kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593363337
##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -92,7 +92,10 @@ class DelayedFetch(
// has just rolled, then the high watermark offset will
showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593331212
##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message
kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2097869168
@junrao @satishd @chia7712 @showuon
Updated the test plan in the summary. Verified that the patch fixes the
issue by running the trunk and patched build. With the fix, the
junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1586835182
##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java:
##
@@ -28,6 +28,7 @@ public final class LogOffsetMetadata {
//TODO
kamalcph opened a new pull request, #15825:
URL: https://github.com/apache/kafka/pull/15825
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade
76 matches
Mail list logo