Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1594849626 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -404,9 +414,7 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") -val expectedOffsetOfMaxTimestamp = 1 -assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.offsetOfMaxTimestampMs, - s"Offset of max timestamp should be 1") +assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) Review Comment: see https://github.com/apache/kafka/pull/15904 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1594333143 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -404,9 +414,7 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") -val expectedOffsetOfMaxTimestamp = 1 -assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.offsetOfMaxTimestampMs, - s"Offset of max timestamp should be 1") +assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) Review Comment: > yep, that is a "TYPO" but it does not change the test. The issue is that the test is testing the wrong expected value. For magic of 1, the offset for max timestamp should be 1 instead of 2. > However, I do observe a potential bug. Yes, this can lead to inaccurate LogAppendInfo.sourceCompression. But it doesn't seem to have real impact now. LogAppendInfo.sourceCompression is only used in LogValidator, which is only called by the leader. In the leader, currently, we expect only 1 batch per producer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1594333143 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -404,9 +414,7 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") -val expectedOffsetOfMaxTimestamp = 1 -assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.offsetOfMaxTimestampMs, - s"Offset of max timestamp should be 1") +assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) Review Comment: > yep, that is a "TYPO" but it does not change the test. The issue is that the test is testing the wrong expected value. For magic of 1, the offset should be 1 instead of 2. > However, I do observe a potential bug. Yes, this can lead to inaccurate LogAppendInfo.sourceCompression. But it doesn't seem to have real impact now. LogAppendInfo.sourceCompression is only used in LogValidator, which is only called by the leader. In the leader, currently, we expect only 1 batch per producer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1593260374 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -404,9 +414,7 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") -val expectedOffsetOfMaxTimestamp = 1 -assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.offsetOfMaxTimestampMs, - s"Offset of max timestamp should be 1") +assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) Review Comment: for another, `LogValidator` is moved to storage module already but its unit test is still in core module. That is a bit weird. We can rewrite it by java with bug fix and then move it to storage module. I have filed https://issues.apache.org/jira/browse/KAFKA-16689 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1593241011 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -404,9 +414,7 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") -val expectedOffsetOfMaxTimestamp = 1 -assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.offsetOfMaxTimestampMs, - s"Offset of max timestamp should be 1") +assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) Review Comment: yep, that is a "TYPO" but it does not change the test. We do pass the "NONE" to create `LogValidator` so it will run the path `assignOffsetsNonCompressed` https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala#L377 However, I do observe a potential bug. **context** 1. Those batches can have different compression 2. We take the compression from last batch https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/log/UnifiedLog.scala#L1180 **potential bug** topic-level compression = GZIP batch_0 = NONE batch_1 = GZIP In this case, we don't rebuild records according to topic-level compression since the compression of "last batch" is equal to `GZIP`. Hence, it results in batch_0 having incorrect compression. This bug does not produce corrupt records, so we can add comments/docs to describe that issue. Or we can fix it by changing the `sourceCompression` to be a "collection" of all batches' compression, and then do conversion if one of them is mismatched. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1592859538 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -404,9 +414,7 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") -val expectedOffsetOfMaxTimestamp = 1 -assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.offsetOfMaxTimestampMs, - s"Offset of max timestamp should be 1") +assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) Review Comment: @chia7712 : There seems to be an existing bug. The method is `checkNonCompressed()`, but in line 370, we set the compression codec to GZIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2046489133 @junrao @showuon thanks for all your reviews and helps! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 merged PR #15621: URL: https://github.com/apache/kafka/pull/15621 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2046457414 ``` Build / JDK 21 and Scala 2.13 / testInvalidPasswordSaslScram() – org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureNoDelayTest ``` https://issues.apache.org/jira/browse/KAFKA-16497 ``` Build / JDK 21 and Scala 2.13 / testReplicateFromLatest() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest ``` https://issues.apache.org/jira/browse/KAFKA-16383 ``` Build / JDK 21 and Scala 2.13 / testAlterSinkConnectorOffsetsZombieSinkTasks – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest ``` https://issues.apache.org/jira/browse/KAFKA-15917 ``` Build / JDK 21 and Scala 2.13 / testGetSinkConnectorOffsets – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest ``` https://issues.apache.org/jira/browse/KAFKA-16498 ``` Build / JDK 21 and Scala 2.13 / testResetSinkConnectorOffsetsOverriddenConsumerGroupId – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest ``` https://issues.apache.org/jira/browse/KAFKA-15891 ``` Build / JDK 21 and Scala 2.13 / testCacheEviction() – org.apache.kafka.server.ClientMetricsManagerTest ``` https://issues.apache.org/jira/browse/KAFKA-16499 ``` Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest ``` https://issues.apache.org/jira/browse/KAFKA-16136 ``` Build / JDK 17 and Scala 2.13 / "testTrustStoreAlter(String).quorum=kraft" – kafka.server.DynamicBrokerReconfigurationTest ``` https://issues.apache.org/jira/browse/KAFKA-16500 ``` Build / JDK 8 and Scala 2.12 / testConsumptionWithBrokerFailures() – kafka.api.ConsumerBounceTest ``` https://issues.apache.org/jira/browse/KAFKA-15146 ``` Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest ``` https://issues.apache.org/jira/browse/KAFKA-15927 ``` Build / JDK 11 and Scala 2.13 / testSeparateOffsetsTopic – org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest ``` https://issues.apache.org/jira/browse/KAFKA-14089 ``` Build / JDK 11 and Scala 2.13 / testConsumptionWithBrokerFailures() – kafka.api.ConsumerBounceTest ``` https://issues.apache.org/jira/browse/KAFKA-15146 ``` Build / JDK 11 and Scala 2.13 / "testCreateUserWithDelegationToken(String).quorum=kraft" – kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest ``` https://issues.apache.org/jira/browse/KAFKA-16501 ``` Build / JDK 11 and Scala 2.13 / "testBrokerHeartbeatDuringMigration(MetadataVersion).metadataVersion=3.4-IV0" – org.apache.kafka.controller.QuorumControllerTest ``` https://issues.apache.org/jira/browse/KAFKA-15963 ``` Build / JDK 11 and Scala 2.13 / shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2] – org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest ``` https://issues.apache.org/jira/browse/KAFKA-16502 ``` Build / JDK 11 and Scala 2.13 / testDescribeQuorumReplicationSuccessful [1] Type=Rt-Combined, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest ``` https://issues.apache.org/jira/browse/KAFKA-15104 ok, all pass on my local and they have jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2045917889 > In the last run, it seem that JDK 21 and Scala 2.13 didn't complete. Could you trigger another build? Typically, this could be done by closing the PR, waiting for 20 secs and opening it again. thanks for the tip. I rebase the code to trigger QA in order to make sure this PR works well with latest code :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2045841631 @chia7712 : Thanks for triaging the failed tests. In the last run, it seem that JDK 21 and Scala 2.13 didn't complete. Could you trigger another build? Typically, this could be done by closing the PR, waiting for 20 secs and opening it again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2045819511 ``` Build / JDK 11 and Scala 2.13 / testLowMaxFetchSizeForRequestAndPartition(String, String).quorum=kraft+kip848.groupProtocol=consumer – kafka.api.PlaintextConsumerFetchTest ``` https://issues.apache.org/jira/browse/KAFKA-16494 ``` Build / JDK 11 and Scala 2.13 / testFenceMultipleBrokers() – org.apache.kafka.controller.QuorumControllerTest ``` https://issues.apache.org/jira/browse/KAFKA-15898 ``` Build / JDK 17 and Scala 2.13 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest Build / JDK 8 and Scala 2.12 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest ``` https://issues.apache.org/jira/browse/KAFKA-15945 ``` Build / JDK 17 and Scala 2.13 / testReplicateFromLatest() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest ``` https://issues.apache.org/jira/browse/KAFKA-16383 ``` Build / JDK 8 and Scala 2.12 / testCoordinatorFailover(String, String).quorum=kraft.groupProtocol=classic – kafka.api.SslConsumerTest ``` https://issues.apache.org/jira/browse/KAFKA-16024 ``` Build / JDK 8 and Scala 2.12 / "testCommitTransactionTimeout(String).quorum=kraft+kip848" – org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest ``` https://issues.apache.org/jira/browse/KAFKA-16495 ``` Build / JDK 8 and Scala 2.12 / testDescribeQuorumReplicationSuccessful [2] Type=Raft-Isolated, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest ``` https://issues.apache.org/jira/browse/KAFKA-15104 @junrao those failed tests pass on my local, and they have jira now. Please review this PR again. thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2045781013 ``` Build / JDK 11 and Scala 2.13 / testLowMaxFetchSizeForRequestAndPartition(String, String).quorum=kraft+kip848.groupProtocol=consumer – kafka.api.PlaintextConsumerFetchTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2044846640 > Thanks for the updated PR. The code looks good to me. There were 50 failed tests. Is any of them related to the PR? If not, have they all been tracked? there are many timeout exception, and so I feel that could be caused by busy server. I will trigger QA again instead of creating a bunch of flaky issues -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2043792883 @chia7712 : Thanks for the updated PR. The code looks good to me. There were 50 failed tests. Is any of them related to the PR? If not, have they all been tracked? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2043274125 @junrao thanks for reviews. both comments get addressed in https://github.com/apache/kafka/pull/15621/commits/581242c1fa6c005bf91a7ced96932774c2c02cd9 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1556068446 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -56,11 +60,33 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @AfterEach override def tearDown(): Unit = { -setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testListMaxTimestampWithEmptyLog(quorum: String): Unit = { +val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topicName) +assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, maxTimestampOffset.offset()) +assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, maxTimestampOffset.timestamp()) + } + + @ParameterizedTest + @ValueSource(strings = Array("zk")) + def testListVersion0(quorum: String): Unit = { +// create records for version 0 +createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V0) +produceMessagesInSeparateBatch() + +// update version to version 1 to list offset for max timestamp +createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1) +// the offset of max timestamp is always -1 if the batch version is 0 +verifyListOffsets(expectedMaxTimestampOffset = -1) + Review Comment: extra new line ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -123,7 +149,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @ParameterizedTest @ValueSource(strings = Array("zk")) def testThreeRecordsInSeparateBatchWithMessageConversion(quorum: String): Unit = { -createOldMessageFormatBrokers() +createMessageFormatBrokers(RecordBatch.MAGIC_VALUE_V1) Review Comment: > // In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. > // So in this separate batch test, it'll be the last offset 2 The comment in line 159 is not very accurate. Since we advance the time for each batch, the maxTimestampOffset is the message in the last batch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2041650425 > There are quite a few test failures on [kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15621/35/testReport/junit/kafka.server/ListOffsetsRequestTest/Build___JDK_21_and_Scala_2_13___testResponseIncludesLeaderEpoch__/). yep, I have fixed it on my local. will update PR later. thanks for the reminder :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2041649750 @chia7712 : There are quite a few test failures on [kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15621/35/testReport/junit/kafka.server/ListOffsetsRequestTest/Build___JDK_21_and_Scala_2_13___testResponseIncludesLeaderEpoch__/). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1554638802 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) Review Comment: I fix it in https://github.com/apache/kafka/pull/15621/commits/8b1005e4385dae14901d5b07fcf365e49bf93127 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1554155328 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) Review Comment: This is an existing issue. But it seems that the epoch of MAX_TIMESTAMP was never implemented correctly. This should be the epoch corresponding to the returned offset and is not necessarily the latest epoch. We could either fix it here or in a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2041118676 ``` Build / JDK 21 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest ``` https://issues.apache.org/jira/browse/KAFKA-15927 ``` Build / JDK 21 and Scala 2.13 / testRemoteLogReaderMetrics() – kafka.server.ReplicaManagerTest ``` https://issues.apache.org/jira/browse/KAFKA-16481 ``` Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest ``` https://issues.apache.org/jira/browse/KAFKA-8115 ``` Build / JDK 17 and Scala 2.13 / testIOExceptionDuringCheckpoint(String).quorum=kraft – kafka.server.LogDirFailureTest Build / JDK 11 and Scala 2.13 / testIOExceptionDuringLogRoll(String).quorum=kraft – kafka.server.LogDirFailureTest ``` https://issues.apache.org/jira/browse/KAFKA-16234 ``` Build / JDK 17 and Scala 2.13 / testFenceMultipleBrokers() – org.apache.kafka.controller.QuorumControllerTest ``` https://issues.apache.org/jira/browse/KAFKA-15898 ``` Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest ``` https://issues.apache.org/jira/browse/KAFKA-16136 ``` Build / JDK 8 and Scala 2.12 / testConsumptionWithBrokerFailures() – kafka.api.ConsumerBounceTest ``` https://issues.apache.org/jira/browse/KAFKA-15146 ``` Build / JDK 8 and Scala 2.12 / testSendOffsetsToTransactionTimeout(String).quorum=kraft – org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest Build / JDK 11 and Scala 2.13 / testAbortTransactionTimeout(String).quorum=kraft – org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest ``` https://issues.apache.org/jira/browse/KAFKA-15772 ok, it seems all failed flaky have jira now. wait for remove reviews. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2040270805 > Regarding the previous failed tests, one possibility is that the data on the server passed the retention time and is garbage collected. The default retention time is 7 days, which should be long enough. However, since we reuse mockTime, if the test runs long, the retention time might still be reached. Perhaps we could set [log.retention.ms](https://kafka.apache.org/documentation/#brokerconfigs_log.retention.ms) to -1? ``` org.opentest4j.AssertionFailedError: expected: <0> but was: <3> ``` You really hit the bullseye. I can reproduce the error by doing a little sleep before fetching data. Will set `retention.ms` to -1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2040231361 @chia7712 : Thanks for the updated PR. Regarding the previous failed tests, one possibility is that the data on the server passed the retention time and is garbage collected. The default retention time is 7 days, which should be long enough. However, since we reuse mockTime, if the test runs long, the retention time might still be reached. Perhaps we could set [log.retention.ms](https://kafka.apache.org/documentation/#brokerconfigs_log.retention.ms) to -1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2038916109 previous failed tests are gone. rebase to trigger QA again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1552358023 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -189,14 +215,56 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { } private def verifyListOffsets(topic: String = topicName, expectedMaxTimestampOffset: Int = 1): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) -assertEquals(0, earliestOffset.offset()) +def check(): Unit = { + val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) + assertEquals(0, earliestOffset.offset()) -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) -assertEquals(3, latestOffset.offset()) + val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) + assertEquals(3, latestOffset.offset()) + + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) + assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) +} -val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) -assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) +// case 0: test the offsets from leader's append path +check() + +// case 1: test the offsets from follower's append path. +// we make a follower be the new leader to handle the ListOffsetRequest +def leader(): Int = adminClient.describeTopics(java.util.Collections.singletonList(topic)) + .allTopicNames().get().get(topic).partitions().get(0).leader().id() + +val previousLeader = leader() +val newLeader = brokers.map(_.config.brokerId).find(_ != previousLeader).get + +// change the leader to new one + adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new TopicPartition(topic, 0), + Optional.of(new NewPartitionReassignment(java.util.Arrays.asList(newLeader).all().get() +// wait for all reassignments get completed +waitForAllReassignmentsToComplete(adminClient) +// make sure we are able to see the new leader +var lastLeader = leader() Review Comment: It seems that we could just initialize with -1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2038064405 @junrao thanks for reviews. I have removed the useless log and revise the test. let us see what happens. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1552264764 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -483,12 +484,13 @@ public int recover(ProducerStateManager producerStateManager, Optional maxTimestampSoFar()) { maxTimestampAndOffsetSoFar = new TimestampOffset(batch.maxTimestamp(), batch.lastOffset()); +System.out.println("[CHIA] recovery: " + maxTimestampAndOffsetSoFar); Review Comment: Is this just for testing? ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -189,14 +215,52 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { } private def verifyListOffsets(topic: String = topicName, expectedMaxTimestampOffset: Int = 1): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) -assertEquals(0, earliestOffset.offset()) +def check(): Unit = { + val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) + assertEquals(0, earliestOffset.offset()) -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) -assertEquals(3, latestOffset.offset()) + val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) + assertEquals(3, latestOffset.offset()) + + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) + assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) +} -val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) -assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) +// case 0: test the offsets from leader's append path +check() + +// case 1: test the offsets from follower's append path. +// we make a follower be the new leader to handle the ListOffsetRequest +def leader(): Int = adminClient.describeTopics(java.util.Collections.singletonList(topic)) + .allTopicNames().get().get(topic).partitions().get(0).leader().id() + +val previousLeader = leader() +val newLeader = brokers.map(_.config.brokerId).find(_ != previousLeader).get + +// change the leader to new one + adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new TopicPartition(topic, 0), + Optional.of(new NewPartitionReassignment(java.util.Arrays.asList(newLeader).all().get() +// wait for all reassignments get completed +waitForAllReassignmentsToComplete(adminClient) +// make sure we are able to see the new leader +TestUtils.waitUntilTrue(() => newLeader == leader(), s"expected leader: $newLeader but actual: ${leader()}") Review Comment: The leader could change in the error message by calling leader() again. Could we save the last leader and use that in the error message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2037697246 > (kafka.admin.ListOffsetsIntegrationTest.testThreeCompressedRecordsInSeparateBatch(String).quorum=kraft) failed with the following. I'm trying to reproduce it on my local :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1552022713 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -189,14 +220,49 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { } private def verifyListOffsets(topic: String = topicName, expectedMaxTimestampOffset: Int = 1): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) -assertEquals(0, earliestOffset.offset()) +def check(): Unit = { + val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) + assertEquals(0, earliestOffset.offset()) -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) -assertEquals(3, latestOffset.offset()) + val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) + assertEquals(3, latestOffset.offset()) + + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) + assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) +} -val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) -assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) +// case 0: test the offsets from leader's append path +check() + +// case 1: test the offsets from follower's append path. +// we make a follower be the new leader to handle the ListOffsetRequest +val partitionAssignment = adminClient.describeTopics(java.util.Collections.singletonList(topic)) + .allTopicNames().get().get(topic).partitions().get(0) +val newLeader = brokers.map(_.config.brokerId).find(_ != partitionAssignment.leader().id()).get + adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new TopicPartition(topic, 0), + Optional.of(new NewPartitionReassignment(java.util.Arrays.asList(newLeader).all().get() +waitForAllReassignmentsToComplete(adminClient) +TestUtils.waitUntilTrue(() => newLeader == adminClient.describeTopics(java.util.Collections.singletonList(topic)) + .allTopicNames().get().get(topic).partitions().get(0).leader().id(), "expected leader: " + newLeader + + ", but actual leader: " + adminClient.describeTopics(java.util.Collections.singletonList(topic)) + .allTopicNames().get().get(topic).partitions().get(0).leader().id()) Review Comment: This is a bit hard to read now. Could we do `adminClient.describeTopics` once and reuse the result? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2037601997 merge trunk to trigger QA again. Also, the error seems happen due to unchanged leader. will check it later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1550393556 ## clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java: ## @@ -245,4 +246,23 @@ public interface RecordBatch extends Iterable { * @return Whether this is a batch containing control records */ boolean isControlBatch(); + +/** + * iterate all records to find the offset of max timestamp. + * noted: + * 1) that the earliest offset will return if there are multi records having same (max) timestamp + * 2) it always return -1 if the {@link RecordBatch#magic()} is equal to {@link RecordBatch#MAGIC_VALUE_V0} Review Comment: return => returns -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1550271336 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -56,11 +60,38 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @AfterEach override def tearDown(): Unit = { -setOldMessageFormat = false +version = RecordBatch.MAGIC_VALUE_V2 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testListMaxTimestampWithEmptyLog(quorum: String): Unit = { +val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topicName) +assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, maxTimestampOffset.offset()) +assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, maxTimestampOffset.timestamp()) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk")) + def testListVersion0(quorum: String): Unit = { Review Comment: @junrao this is the new test case for the version 0 that we should get `-1` if the magic value is 0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1550270774 ## clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java: ## @@ -245,4 +246,23 @@ public interface RecordBatch extends Iterable { * @return Whether this is a batch containing control records */ boolean isControlBatch(); + +/** + * iterate all records to find the offset of max timestamp. + * noted: + * 1) that the earliest offset will return if there are multi records having same (max) timestamp + * 2) it always return -1 if the {@link RecordBatch#magic()} is equal to {@link RecordBatch#MAGIC_VALUE_V0} + * @return offset of max timestamp + */ +default Optional offsetOfMaxTimestamp() { +if (magic() == RecordBatch.MAGIC_VALUE_V0) return Optional.empty(); Review Comment: @junrao the short-circuit is added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1550270383 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) +latestTimestampSegment.log.batchesFrom(position.position).asScala Review Comment: I have addressed the comment by https://github.com/apache/kafka/pull/15621/commits/4785371c54e2fc2c540895ffe2f94829449937e6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2035323125 > There were 69 test failures and quite a few of them related to ListOffset There is another PR (https://github.com/apache/kafka/pull/15489) encounters same error that listing offset return incorrect offset. I'm digging in it ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1550055646 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) +latestTimestampSegment.log.batchesFrom(position.position).asScala Review Comment: In the case of magic=0, we will find latestTimestampSegment with NO_TIMESTAMP. If we go through the rest of the logic, it seems that we will return the first offset instead of -1. Perhaps we should short-circuit if latestTimestampSegment is NO_TIMESTAMP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2034230452 the failed tests pass on my local. ``` ./gradlew cleanTest :streams:test --tests SlidingWindowedKStreamIntegrationTest.shouldRestoreAfterJoinRestart :storage:test --tests TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout :metadata:test --tests QuorumControllerTest.testFenceMultipleBrokers :trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test --tests DedicatedMirrorIntegrationTest.testMultiNodeCluster --tests MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault --tests MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithFrequentOffsetSyncs --tests MirrorConnectorsIntegrationExactlyOnceTest.testSyncTopicConfigs :core:test --tests ListOffsetsIntegrationTest.testThreeRecordsInSeparateBatchHavingDifferentCompressionTypeWithServer --tests ListOffsetsIntegrationTest.testThreeNonCompressedRecordsInOneBatch --tests ListOffsetsIntegrationTest.testThreeRecordsInOneBatchHavingDifferentCompressionTypeWithServer --tests ListOffsetsIntegrationTest.test ThreeCompressedRecordsInOneBatch --tests ListOffsetsIntegrationTest.testThreeCompressedRecordsInSeparateBatch --tests ListOffsetsIntegrationTest.testThreeNonCompressedRecordsInSeparateBatch --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithoutDescribeAclViaAssign --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests ConsumerBounceTest.testCloseDuringRebalance --tests LogDirFailureTest.testIOExceptionDuringLogRoll --tests LogDirFailureTest.testIOExceptionDuringCheckpoint :clients:test --tests KafkaAdminClientTest.testClientSideTimeoutAfterFailureToReceiveResponse ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1548765182 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) +Some(latestTimestampSegment.log.batchesFrom(position.position).asScala + .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) + .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, lpc))) + // return the base offset for backward compatibility if there is no batches + .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, latestTimestampSegment.baseOffset(), lpc))) Review Comment: sure. https://github.com/apache/kafka/pull/15621/commits/8a7ed30692bd070fb4160a6cbc76a868484529c3 return none and add related test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1548687661 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) +Some(latestTimestampSegment.log.batchesFrom(position.position).asScala + .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) + .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, lpc))) + // return the base offset for backward compatibility if there is no batches + .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, latestTimestampSegment.baseOffset(), lpc))) Review Comment: Got it. If there is no timestamp index, we initialize `maxTimestampAndOffsetSoFar` to `TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset())`. That's why it picks up the base offset. However, it doesn't seem intuitive for the user. Returning None seems better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1548654826 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) +Some(latestTimestampSegment.log.batchesFrom(position.position).asScala + .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) + .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, lpc))) + // return the base offset for backward compatibility if there is no batches + .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, latestTimestampSegment.baseOffset(), lpc))) Review Comment: If it is fine to change the behavior, we can just retune None to build the response with unknown offset/timestamp. https://github.com/apache/kafka/blob/ee61bb721eecb0404929f125fe43392f3d024453/core/src/main/scala/kafka/server/KafkaApis.scala#L1146 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1548644261 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) +Some(latestTimestampSegment.log.batchesFrom(position.position).asScala + .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) + .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, lpc))) + // return the base offset for backward compatibility if there is no batches + .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, latestTimestampSegment.baseOffset(), lpc))) Review Comment: I am ok to return -1. However, it seems that we return base offset before when we do not find the max timestamp (no batch exists). Hence, the main reason of returning base offset is backward compatibility. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1548633965 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,17 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) +Some(latestTimestampSegment.log.batchesFrom(position.position).asScala + .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) + .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, lpc))) + // return the base offset for backward compatibility if there is no batches + .getOrElse(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, latestTimestampSegment.baseOffset(), lpc))) Review Comment: Hmm, still not sure about this. If we can't find the maxTimestamp, intuitively, it seems that we should return -1 for both timestamp and offset? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1548530489 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) +Some(latestTimestampSegment.log.batchesFrom(position.position).asScala + .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) + .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, lpc))) + .getOrElse(new TimestampAndOffset(-1, 0, lpc))) // always return something for backward compatibility Review Comment: > why do we need to return offset=0 when we can't find the maxTimestamp? oh, my bad. we should return base offset instead of zero. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1548487542 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) +Some(latestTimestampSegment.log.batchesFrom(position.position).asScala + .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) + .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, lpc))) + .getOrElse(new TimestampAndOffset(-1, 0, lpc))) // always return something for backward compatibility Review Comment: Hmm, why do we need to return offset=0 when we can't find the maxTimestamp? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1548438900 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) +Some(latestTimestampSegment.log.batchesFrom(position.position).asScala + .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) + .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, lpc))) + .getOrElse(new TimestampAndOffset(-1, 0, lpc))) // always return something for backward compatibility Review Comment: @showuon your previous comment is right (sorry that I can't find the comment but it is in my mind). We need to return `offset=0 and ts=-1` if there are no batches for the sake of backward compatibility. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1548427028 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -240,25 +240,54 @@ public MemoryRecords build() { return builtRecords; } + /** - * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. - * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. - * - * If the log append time is used, the offset will be the first offset of the record. - * - * If create time is used, the offset will always be the offset of the record with the max timestamp. - * - * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. - * - * @return The max timestamp and its offset + * There are three cases of finding max timestamp to return: + * 1) version 0: The max timestamp is NO_TIMESTAMP (-1) + * 2) LogAppendTime: All records have same timestamp, and so the max timestamp is equal to logAppendTime + * 3) CreateTime: The max timestamp of record + * + * Let's talk about OffsetOfMaxTimestamp. There are some paths that we don't try to find the OffsetOfMaxTimestamp + * to avoid expensive records iteration. Those paths include follower append and index recovery. In order to + * avoid inconsistent time index, we let all paths find shallowOffsetOfMaxTimestamp instead of OffsetOfMaxTimestamp. + * + * Let's define the shallowOffsetOfMaxTimestamp: It is last offset of the batch having max timestamp. If there are + * many batches having same max timestamp, we pick up the earliest batch. + * + * There are five cases of finding shallowOffsetOfMaxTimestamp to return: + * 1) version 0: It is always the -1 + * 2) LogAppendTime with single batch: It is the offset of last record + * 3) LogAppendTime with many single-record batches: Those single-record batches have same max timestamp, so the + * base offset is equal with the last offset of earliest batch Review Comment: so the base offset is equal with the last offset of earliest batch => so we return the base offset, which is equal to the last offset of earliest batch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1548390558 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -240,25 +240,54 @@ public MemoryRecords build() { return builtRecords; } + /** - * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. - * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. - * - * If the log append time is used, the offset will be the first offset of the record. - * - * If create time is used, the offset will always be the offset of the record with the max timestamp. - * - * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. - * - * @return The max timestamp and its offset + * There are three cases of finding max timestamp to return: Review Comment: hi @junrao I rewrite whole comments to list all cases. please take a look at it, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r154830 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -240,25 +240,40 @@ public MemoryRecords build() { return builtRecords; } + /** - * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. - * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. - * - * If the log append time is used, the offset will be the first offset of the record. - * - * If create time is used, the offset will always be the offset of the record with the max timestamp. - * - * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. + * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader append, follower append, and index recovery) + * The definition of shallowOffsetOfMaxTimestamp is the last offset of the batch having max timestamp. + * If there are many batches having same max timestamp, we pick up the earliest batch. + * + * If the log append time is used, the offset will be the last offset unless no compression is used and + * the message format version is 0 or 1, in which case, it will be -1. Review Comment: Still not very accurate. For message format version 0, the offset will be -1. For message format version 1, the offset will be the first offset. ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -240,25 +240,40 @@ public MemoryRecords build() { return builtRecords; } + /** - * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. - * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. - * - * If the log append time is used, the offset will be the first offset of the record. - * - * If create time is used, the offset will always be the offset of the record with the max timestamp. - * - * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. + * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader append, follower append, and index recovery) + * The definition of shallowOffsetOfMaxTimestamp is the last offset of the batch having max timestamp. + * If there are many batches having same max timestamp, we pick up the earliest batch. + * + * If the log append time is used, the offset will be the last offset unless no compression is used and + * the message format version is 0 or 1, in which case, it will be -1. + * + * If create time is used, the offset will be the last offset unless no compression is used and the message + * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. Review Comment: For message format version 0, the offset will be -1. For message format version 1, the offset will be the offset of the record with the max timestamp. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2032614122 @junrao thanks for additional reviews. I have addressed them except https://github.com/apache/kafka/pull/15621#discussion_r1548230600 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1548258165 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -434,7 +442,8 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse now, records, maxTimestamp, -offsetOfMaxTimestamp, +// there is only one batch in this path, so last offset can be viewed as shallowOffsetOfMaxTimestamp +lastOffset, Review Comment: If the magic is 0, we don't reach this path. Instead, we run `buildRecordsAndAssignOffsets` to handle the version 0. see https://github.com/apache/kafka/blob/ee61bb721eecb0404929f125fe43392f3d024453/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L343 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1548211287 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -240,25 +240,40 @@ public MemoryRecords build() { return builtRecords; } + /** - * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. - * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. - * - * If the log append time is used, the offset will be the first offset of the record. - * - * If create time is used, the offset will always be the offset of the record with the max timestamp. - * - * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. + * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader append, follower append, and index recovery) + * The definition of shallowOffsetOfMaxTimestamp is the last offset of the batch which having max timestamp. Review Comment: which having => having ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -293,14 +293,29 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; -offsetOfMaxTimestamp = initialOffset; +// those checks should be equal to MemoryRecordsBuilder#info +switch (toMagic) { +case RecordBatch.MAGIC_VALUE_V0: +// value will be the default value: -1 +shallowOffsetOfMaxTimestamp = -1; Review Comment: maxTimestamp should be NO_TIMESTAMP if magic is 0. ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -434,7 +442,8 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse now, records, maxTimestamp, -offsetOfMaxTimestamp, +// there is only one batch in this path, so last offset can be viewed as shallowOffsetOfMaxTimestamp +lastOffset, Review Comment: If magic is 0, we should set both maxTimestamp and shallowOffsetOfMaxTimestamp to -1. ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -240,25 +240,40 @@ public MemoryRecords build() { return builtRecords; } + /** - * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. - * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. - * - * If the log append time is used, the offset will be the first offset of the record. - * - * If create time is used, the offset will always be the offset of the record with the max timestamp. - * - * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. + * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader append, follower append, and index recovery) + * The definition of shallowOffsetOfMaxTimestamp is the last offset of the batch which having max timestamp. + * If there are many batches having same max timestamp, we pick up the earliest batch. + * + * If the log append time is used, the offset will be the last offset unless no compression is used and + * the message format version is 0 or 1, in which case, it will be the first offset. Review Comment: For message format 0, offset is always -1. Ditto below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2030856554 @junrao thanks for reviews. I have addressed all comments, and add new test `testLogAppendTimeNonCompressedV0` to cover v0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1546994725 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -275,7 +278,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) { maxTimestamp = maxBatchTimestamp; -offsetOfMaxTimestamp = offsetOfMaxBatchTimestamp; +shallowOffsetOfMaxTimestamp = offsetOfMaxBatchTimestamp; Review Comment: > Do we have test cases covering that? There is a UT already, and I will add more tests to cover it. https://github.com/apache/kafka/blob/cc6b919212ae62d75850214ae2c93379b78ff325/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala#L408 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1546966401 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java: ## @@ -259,7 +259,7 @@ public String toString() { ", lastOffset=" + lastOffset + ", lastLeaderEpoch=" + lastLeaderEpoch + ", maxTimestamp=" + maxTimestamp + -", offsetOfMaxTimestamp=" + offsetOfMaxTimestamp + +", offsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp + Review Comment: offsetOfMaxTimestamp => shallowOffsetOfMaxTimestamp ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -68,17 +68,20 @@ public static class ValidationResult { public final long logAppendTimeMs; public final MemoryRecords validatedRecords; public final long maxTimestampMs; -public final long offsetOfMaxTimestampMs; +// we only maintain batch level offset for max timestamp since we want to align the behavior of updating time +// indexing entries. The paths of follower sync and replica recovery do not iterate all records, so they have no Review Comment: follower sync => follower append ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -275,7 +278,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) { maxTimestamp = maxBatchTimestamp; -offsetOfMaxTimestamp = offsetOfMaxBatchTimestamp; +shallowOffsetOfMaxTimestamp = offsetOfMaxBatchTimestamp; Review Comment: We need to set shallowOffsetOfMaxTimestamp to the last offset in the batch. Do we have test cases covering that? ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java: ## @@ -117,7 +117,7 @@ public LogAppendInfo(long firstOffset, this.lastOffset = lastOffset; this.lastLeaderEpoch = lastLeaderEpoch; this.maxTimestamp = maxTimestamp; -this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; +this.shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; Review Comment: Could we rename offsetOfMaxTimestamp in the input param and the javadoc accordingly? It would be useful to rename the local val in `UnifiedLog.analyzeAndValidateRecords` too. ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -293,14 +296,20 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; -offsetOfMaxTimestamp = initialOffset; +if (toMagic >= RecordBatch.MAGIC_VALUE_V2) +// case 0: there is only one batch so use the last offset +shallowOffsetOfMaxTimestamp = offsetCounter.value - 1; +else +// case 1: Those single-record batches have same max timestamp, so the initial offset is equal with +// the last offset of earliest batch +shallowOffsetOfMaxTimestamp = initialOffset; Review Comment: For MAGIC_VALUE_V0, shallowOffsetOfMaxTimestamp should be -1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2030454105 @junrao thanks for all your reviews and patience. all comments are addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1546692495 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -232,17 +233,17 @@ private boolean canConvertToRelativeOffset(long offset) throws IOException { * * @param largestOffset The last offset in the message set * @param largestTimestampMs The largest timestamp in the message set. - * @param offsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. + * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. Review Comment: Could we make the comment clear that the last offset is in the batch? ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -240,25 +240,39 @@ public MemoryRecords build() { return builtRecords; } + /** - * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. - * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. - * - * If the log append time is used, the offset will be the first offset of the record. - * - * If create time is used, the offset will always be the offset of the record with the max timestamp. - * - * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. + * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader append, follower append, and index recovery) + * The definition of shallowOffsetOfMaxTimestamp is the last offset of the batch which having max timestamp. + * If there are many batches having same max timestamp, we pick up the earliest batch. + * + * If the log append time is used, the offset will be the last offset unless no compression is used and + * the message format version is 0 or 1, in which case, it will be the first offset. + * + * If create time is used, the offset will be the last offset unless no compression is used and the message + * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. * * @return The max timestamp and its offset */ public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) { -return new RecordsInfo(logAppendTime, baseOffset); +if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) +// case 0: there is only one batch so use the last offset +return new RecordsInfo(logAppendTime, lastOffset); +else +// case 1: there are many single-record batches having same max timestamp, so the base offset is +// equal with the last offset of earliest batch +return new RecordsInfo(logAppendTime, baseOffset); +} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { +return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { -// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping -// If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1] -return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); +if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) +// ditto to case 0 +return new RecordsInfo(maxTimestamp, lastOffset); +else +// case 2: Each batch is composed of single record, and offsetOfMaxTimestamp points to the record having +// max timestamp. Hence, offsetOfMaxTimestamp is equal to the last offset of earliest batch (record) Review Comment: of earliest batch => of earliest batch with max timestamp? ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -293,14 +292,13 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; -offsetOfMaxTimestamp = initialOffset; } return new ValidationResult( now, records, maxTimestamp, -offsetOfMaxTimestamp, +shallowOffsetOfMaxTimestamp, Review Comment: We need to add the following code back, right? ``` if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { offsetOfMaxTimestamp = offsetCounter.value - 1; } ``` ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -240,25 +240,39 @@ public MemoryRecords build() {
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2029213022 rebase code and apply Luke's patch from https://github.com/chia7712/kafka/pull/3/files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2027904152 > I think both are important. First, it's important to be able to derive the same thing consistently from the leader and the follower log. This affects things like the time indexing entries. It will be confusing if the leader adds an offset in the middle of a batch while the follower adds an offset at the end of the batch. Second, it's important to name things as accurately as possible. Otherwise, future developers could make inaccurate assumptions. you are right. I have reverted the impl and naming. Also, I add extra comments for the "spec" of offsetOfMaxTimestamp -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2027526301 > Do we need to revert all of them? the paths we had fixed works well now. > > 1. It seems to me adding comments for both "recover" and "follower" cases can remind readers that this offsetOfMaxTimestampMs is shallow. > 2. or we can only rename offsetForMaxTimestamp back to shallowOffsetMaxTimestamp but we keep the implementation. > @chia7712 : I think both are important. First, it's important to be able to derive the same thing consistently from the leader and the follower log. This affects things like the time indexing entries. It will be confusing if the leader adds an offset in the middle of a batch while the follower adds an offset at the end of the batch. Second, it's our responsibility name things as accurately as possible. Otherwise, future developers could make inaccurate assumptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1544283639 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -189,14 +190,47 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { } private def verifyListOffsets(topic: String = topicName, expectedMaxTimestampOffset: Int = 1): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) -assertEquals(0, earliestOffset.offset()) +def check(): Unit = { + val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) + assertEquals(0, earliestOffset.offset()) -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) -assertEquals(3, latestOffset.offset()) + val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) + assertEquals(3, latestOffset.offset()) -val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) -assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) + assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) +} + +// case 0: test the offsets from leader's append path +check() + +// case 1: test the offsets from follower's append path. Review Comment: @junrao the extra tests are added. please take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2026641230 > (1) revert all offsetForMaxTimestamp to shallowOffsetMaxTimestamp (2) change/revert the implementation to set shallowOffsetMaxTimestamp accordingly. Do we need to revert all of them? the paths we had fixed works well now. 1) It seems to me adding comments for both "recover" and "follower" cases can remind readers that this `offsetOfMaxTimestampMs` is shallow. 2) or we can only rename `offsetForMaxTimestamp` back to `shallowOffsetMaxTimestamp` but we keep the implementation. @junrao WDYT? > (3) add tests for follower appends will complete it later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1544046500 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +latestTimestampSegment.log.batchesFrom(position.position).asScala + .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp) Review Comment: >We could stop after finding the first batch matching maxTimestamp. oh, sorry for neglect that. > Although it should be impossible, should we handle empty() case? just let `KafkaApis` return error response with `UNKNOWN` https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1146 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
showuon commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1543916425 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +latestTimestampSegment.log.batchesFrom(position.position).asScala + .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp) Review Comment: Although it should be impossible, should we handle `empty()` case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
showuon commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1543906755 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +latestTimestampSegment.log.batchesFrom(position.position).asScala + .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp) Review Comment: Good suggestion. We should use `find` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1543754297 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +latestTimestampSegment.log.batchesFrom(position.position).asScala + .filter(_.maxTimestamp() == maxTimestampSoFar.timestamp) Review Comment: We could stop after finding the first batch matching maxTimestamp. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15618: URL: https://github.com/apache/kafka/pull/15618#issuecomment-2026013673 > I am just saying that we only need to fix this in trunk since the implementation was never correct in any previous branches, thus not a regression. got it. will open another PR for trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 closed pull request #15618: KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… URL: https://github.com/apache/kafka/pull/15618 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on PR #15618: URL: https://github.com/apache/kafka/pull/15618#issuecomment-2026000277 > I am not sure I understand this. All we need for this solution (or workaround) is the "max timestamp" of a segment, since we always iterate the batches (from the segment having the max timestamp) to find the "offset" of max timestamp when handling the ListOffsetsRequest.MAX_TIMESTAMP. Hence, we can correct the implement for all active branches (include 3.6.3) by this PR. Yes, I agree that we can fix this issue completely. I am just saying that we only need to fix this in trunk since the implementation was never correct in any previous branches, thus not a regression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15618: URL: https://github.com/apache/kafka/pull/15618#discussion_r1543560539 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, // constant time access while being safe to use with concurrent collections unlike `toArray`. val segmentsCopy = logSegments.toBuffer val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.maxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, +val batch = latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp()) Review Comment: Yes, suppose you have a 1GB segment and the maxTimestamp is in the last batch. latestTimestampSegment.log.batches() needs to read 1GB from disk. Using the offsetIndex, we only need to read the index and the index.interval (default to 4KB) worth of bytes. > Is the impl of lookup like this? Yes, that's what I was thinking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15618: URL: https://github.com/apache/kafka/pull/15618#issuecomment-2025943523 > Since the follower only maintains offsetForMaxTimestamp at the batch level, the listMaxTimestamp API was never implemented correctly. I am not sure I understand this. All we need for this solution (or workaround) is the "max timestamp" of a segment, since we always iterate the batches (from the segment having the max timestamp) to find the "offset" of max timestamp when handling the `ListOffsetsRequest.MAX_TIMESTAMP`. Hence, we can correct the implement for all active branches (include 3.6.3) by this PR. > So, technically, there was no regression for listMaxTimestamp. It seems there is no need to fix this in the 3.6? We could just fix it in trunk. BTW, I'm ok to keep the behavior for 3.6 as it is not a kind of "regression". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15618: URL: https://github.com/apache/kafka/pull/15618#discussion_r1543505409 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, // constant time access while being safe to use with concurrent collections unlike `toArray`. val segmentsCopy = logSegments.toBuffer val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.maxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, +val batch = latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp()) Review Comment: > latestTimestampSegment.log.batches() scans the whole log segment and could introduce unnecessary extra I/O. So, there could be performance degradation because of that. The `batches` is a `iterable` object, and its implementation load the batch only if we call `next`. https://github.com/apache/kafka/blob/3.6/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java#L63 Hence, the benefit of looking up for a batch (find the position and then use it to call `batchesFrom`) is that we can save some I/O by skipping some batches. Please correct me if I misunderstand anything. > I am not sure I understand this. Looking up for a batch with each baseOffset or lastOffset will locate the same batch using the offset index, right? Is the impl of lookup like this? ```scala val position = latestTimestampSegment.offsetIndex.lookup(latestTimestampSegment.offsetOfMaxTimestampSoFar) latestTimestampSegment.log.batchesFrom(position.position).asScala ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on PR #15618: URL: https://github.com/apache/kafka/pull/15618#issuecomment-2025853565 @chia7712: Since the follower only maintains offsetForMaxTimestamp at the batch level, the listMaxTimestamp API was never implemented correctly. So, technically, there was no regression for listMaxTimestamp. It seems there is no need to fix this in the 3.6? We could just fix it in trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15618: URL: https://github.com/apache/kafka/pull/15618#discussion_r1543430656 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, // constant time access while being safe to use with concurrent collections unlike `toArray`. val segmentsCopy = logSegments.toBuffer val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.maxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, +val batch = latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp()) Review Comment: @chia7712 :` latestTimestampSegment.log.batches()` scans the whole log segment and could introduce unnecessary extra I/O. So, there could be performance degradation because of that. > Hence we have to use condition baseOffset <= offset <= lastOffset to find batch. I am not sure I understand this. Looking up for a batch with each baseOffset or lastOffset will locate the same batch using the offset index, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15618: URL: https://github.com/apache/kafka/pull/15618#discussion_r1543390343 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, // constant time access while being safe to use with concurrent collections unlike `toArray`. val segmentsCopy = logSegments.toBuffer val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.maxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, +val batch = latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp()) Review Comment: @junrao thanks for feedback. I use the max timestamp to find the "first" batch instead of using offset index. It seems to me using max timestamp is more simple since the offset stored by `maxTimestampAndOffsetSoFar` could be either the last offset or offset of max timestamp. Hence we have to use condition `baseOffset <= offset <= lastOffset` to find batch. I'm ok to use offset if using max timestamp to find first batch have any side effect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15618: URL: https://github.com/apache/kafka/pull/15618#discussion_r1543349555 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1320,10 +1320,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, // constant time access while being safe to use with concurrent collections unlike `toArray`. val segmentsCopy = logSegments.toBuffer val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.maxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, +val batch = latestTimestampSegment.log.batches().asScala.maxBy(_.maxTimestamp()) Review Comment: Hmm, iterating all batches can be expensive. We could use the offset index to find the batch containing the offset in maxTimestampAndOffsetSoFar. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org